blob: e28a3db8a3c2db2db051b21bf167254993905f8a [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
R David Murrayf2ad1732014-12-25 18:36:56 -05007import getpass
Fred Drake38c2ef02001-07-17 20:52:51 +00008import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00009import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +000010import sys
Brian Curtineb24d742010-04-12 17:16:38 +000011import signal
12import subprocess
13import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000014import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000015from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000016import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000017import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import platform
19import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000020import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import asyncore
22import asynchat
23import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010024import itertools
25import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050026import locale
27import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070028import decimal
29import fractions
Christian Heimes25827622013-10-12 01:27:08 +020030import pickle
Victor Stinner4d6a3d62014-12-21 01:16:38 +010031import sysconfig
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000032try:
33 import threading
34except ImportError:
35 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import grp
50 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
51 if hasattr(os, 'getgid'):
52 process_gid = os.getgid()
53 if process_gid not in groups:
54 groups.append(process_gid)
55except ImportError:
56 groups = []
57try:
58 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
60except ImportError:
61 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Georg Brandl2daf6ae2012-02-20 19:54:16 +010067from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Victor Stinner034d0aa2012-06-05 01:22:15 +020073with warnings.catch_warnings():
74 warnings.simplefilter("ignore", DeprecationWarning)
75 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010076st = os.stat(__file__)
77stat_supports_subsecond = (
78 # check if float and int timestamps are different
79 (st.st_atime != st[7])
80 or (st.st_mtime != st[8])
81 or (st.st_ctime != st[9]))
82
Mark Dickinson7cf03892010-04-16 13:45:35 +000083# Detect whether we're on a Linux system that uses the (now outdated
84# and unmaintained) linuxthreads threading library. There's an issue
85# when combining linuxthreads with a failed execv call: see
86# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020087if hasattr(sys, 'thread_info') and sys.thread_info.version:
88 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
89else:
90 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000091
Stefan Krahebee49a2013-01-17 15:31:00 +010092# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
93HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
94
Thomas Wouters0e3f5912006-08-11 14:57:12 +000095# Tests creating TESTFN
96class FileTests(unittest.TestCase):
97 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000098 if os.path.exists(support.TESTFN):
99 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000100 tearDown = setUp
101
102 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000103 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000104 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000105 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000106
Christian Heimesfdab48e2008-01-20 09:06:41 +0000107 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000108 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
109 # We must allocate two consecutive file descriptors, otherwise
110 # it will mess up other file descriptors (perhaps even the three
111 # standard ones).
112 second = os.dup(first)
113 try:
114 retries = 0
115 while second != first + 1:
116 os.close(first)
117 retries += 1
118 if retries > 10:
119 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000120 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000121 first, second = second, os.dup(second)
122 finally:
123 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000124 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000125 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000126 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000127
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000128 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000129 def test_rename(self):
130 path = support.TESTFN
131 old = sys.getrefcount(path)
132 self.assertRaises(TypeError, os.rename, path, 0)
133 new = sys.getrefcount(path)
134 self.assertEqual(old, new)
135
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000136 def test_read(self):
137 with open(support.TESTFN, "w+b") as fobj:
138 fobj.write(b"spam")
139 fobj.flush()
140 fd = fobj.fileno()
141 os.lseek(fd, 0, 0)
142 s = os.read(fd, 4)
143 self.assertEqual(type(s), bytes)
144 self.assertEqual(s, b"spam")
145
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200146 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200147 # Skip the test on 32-bit platforms: the number of bytes must fit in a
148 # Py_ssize_t type
149 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
150 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200151 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
152 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200153 with open(support.TESTFN, "wb") as fp:
154 fp.write(b'test')
155 self.addCleanup(support.unlink, support.TESTFN)
156
157 # Issue #21932: Make sure that os.read() does not raise an
158 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200159 with open(support.TESTFN, "rb") as fp:
160 data = os.read(fp.fileno(), size)
161
162 # The test does not try to read more than 2 GB at once because the
163 # operating system is free to return less bytes than requested.
164 self.assertEqual(data, b'test')
165
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000166 def test_write(self):
167 # os.write() accepts bytes- and buffer-like objects but not strings
168 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
169 self.assertRaises(TypeError, os.write, fd, "beans")
170 os.write(fd, b"bacon\n")
171 os.write(fd, bytearray(b"eggs\n"))
172 os.write(fd, memoryview(b"spam\n"))
173 os.close(fd)
174 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000175 self.assertEqual(fobj.read().splitlines(),
176 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000177
Victor Stinnere0daff12011-03-20 23:36:35 +0100178 def write_windows_console(self, *args):
179 retcode = subprocess.call(args,
180 # use a new console to not flood the test output
181 creationflags=subprocess.CREATE_NEW_CONSOLE,
182 # use a shell to hide the console window (SW_HIDE)
183 shell=True)
184 self.assertEqual(retcode, 0)
185
186 @unittest.skipUnless(sys.platform == 'win32',
187 'test specific to the Windows console')
188 def test_write_windows_console(self):
189 # Issue #11395: the Windows console returns an error (12: not enough
190 # space error) on writing into stdout if stdout mode is binary and the
191 # length is greater than 66,000 bytes (or less, depending on heap
192 # usage).
193 code = "print('x' * 100000)"
194 self.write_windows_console(sys.executable, "-c", code)
195 self.write_windows_console(sys.executable, "-u", "-c", code)
196
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000197 def fdopen_helper(self, *args):
198 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200199 f = os.fdopen(fd, *args)
200 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000201
202 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200203 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
204 os.close(fd)
205
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000206 self.fdopen_helper()
207 self.fdopen_helper('r')
208 self.fdopen_helper('r', 100)
209
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100210 def test_replace(self):
211 TESTFN2 = support.TESTFN + ".2"
212 with open(support.TESTFN, 'w') as f:
213 f.write("1")
214 with open(TESTFN2, 'w') as f:
215 f.write("2")
216 self.addCleanup(os.unlink, TESTFN2)
217 os.replace(support.TESTFN, TESTFN2)
218 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
219 with open(TESTFN2, 'r') as f:
220 self.assertEqual(f.read(), "1")
221
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200222
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000223# Test attributes on return values from os.*stat* family.
224class StatAttributeTests(unittest.TestCase):
225 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000226 os.mkdir(support.TESTFN)
227 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000228 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000229 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000230 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000231
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000232 def tearDown(self):
233 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000234 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235
Serhiy Storchaka43767632013-11-03 21:31:38 +0200236 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000237 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000238 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000239
240 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000241 self.assertEqual(result[stat.ST_SIZE], 3)
242 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000243
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244 # Make sure all the attributes are there
245 members = dir(result)
246 for name in dir(stat):
247 if name[:3] == 'ST_':
248 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000249 if name.endswith("TIME"):
250 def trunc(x): return int(x)
251 else:
252 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000253 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000256
Larry Hastings6fe20b32012-04-19 15:07:49 -0700257 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700258 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700259 for name in 'st_atime st_mtime st_ctime'.split():
260 floaty = int(getattr(result, name) * 100000)
261 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700262 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700263
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264 try:
265 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200266 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267 except IndexError:
268 pass
269
270 # Make sure that assignment fails
271 try:
272 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200273 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000274 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000275 pass
276
277 try:
278 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200279 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000280 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000281 pass
282
283 try:
284 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200285 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000286 except AttributeError:
287 pass
288
289 # Use the stat_result constructor with a too-short tuple.
290 try:
291 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200292 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000293 except TypeError:
294 pass
295
Ezio Melotti42da6632011-03-15 05:18:48 +0200296 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000297 try:
298 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
299 except TypeError:
300 pass
301
Antoine Pitrou38425292010-09-21 18:19:07 +0000302 def test_stat_attributes(self):
303 self.check_stat_attributes(self.fname)
304
305 def test_stat_attributes_bytes(self):
306 try:
307 fname = self.fname.encode(sys.getfilesystemencoding())
308 except UnicodeEncodeError:
309 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100310 with warnings.catch_warnings():
311 warnings.simplefilter("ignore", DeprecationWarning)
312 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000313
Christian Heimes25827622013-10-12 01:27:08 +0200314 def test_stat_result_pickle(self):
315 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200316 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
317 p = pickle.dumps(result, proto)
318 self.assertIn(b'stat_result', p)
319 if proto < 4:
320 self.assertIn(b'cos\nstat_result\n', p)
321 unpickled = pickle.loads(p)
322 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200323
Serhiy Storchaka43767632013-11-03 21:31:38 +0200324 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000325 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000326 try:
327 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000328 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000329 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000330 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200331 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000332
333 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000334 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000335
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000336 # Make sure all the attributes are there.
337 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
338 'ffree', 'favail', 'flag', 'namemax')
339 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000340 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000341
342 # Make sure that assignment really fails
343 try:
344 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200345 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000346 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000347 pass
348
349 try:
350 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200351 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000352 except AttributeError:
353 pass
354
355 # Use the constructor with a too-short tuple.
356 try:
357 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200358 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000359 except TypeError:
360 pass
361
Ezio Melotti42da6632011-03-15 05:18:48 +0200362 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000363 try:
364 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
365 except TypeError:
366 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000367
Christian Heimes25827622013-10-12 01:27:08 +0200368 @unittest.skipUnless(hasattr(os, 'statvfs'),
369 "need os.statvfs()")
370 def test_statvfs_result_pickle(self):
371 try:
372 result = os.statvfs(self.fname)
373 except OSError as e:
374 # On AtheOS, glibc always returns ENOSYS
375 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200376 self.skipTest('os.statvfs() failed with ENOSYS')
377
Serhiy Storchakabad12572014-12-15 14:03:42 +0200378 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
379 p = pickle.dumps(result, proto)
380 self.assertIn(b'statvfs_result', p)
381 if proto < 4:
382 self.assertIn(b'cos\nstatvfs_result\n', p)
383 unpickled = pickle.loads(p)
384 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200385
Thomas Wouters89f507f2006-12-13 04:49:30 +0000386 def test_utime_dir(self):
387 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000388 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000389 # round to int, because some systems may support sub-second
390 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000391 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
392 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000393 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000394
Larry Hastings76ad59b2012-05-03 00:30:07 -0700395 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600396 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600397 # second argument. Check that the previous methods of passing
398 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700399 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600400 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700401 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
402 # Setting the time to the time you just read, then reading again,
403 # should always return exactly the same times.
404 st1 = os.stat(filename)
405 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
406 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600407 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700408 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600409 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700410 # Set to the current time in the new way
411 os.utime(filename)
412 st3 = os.stat(filename)
413 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
414
415 def test_utime(self):
416 def utime(file, times):
417 return os.utime(file, times)
418 self._test_utime(self.fname, getattr, utime, 10)
419 self._test_utime(support.TESTFN, getattr, utime, 10)
420
421
422 def _test_utime_ns(self, set_times_ns, test_dir=True):
423 def getattr_ns(o, attr):
424 return getattr(o, attr + "_ns")
425 ten_s = 10 * 1000 * 1000 * 1000
426 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
427 if test_dir:
428 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
429
430 def test_utime_ns(self):
431 def utime_ns(file, times):
432 return os.utime(file, ns=times)
433 self._test_utime_ns(utime_ns)
434
Larry Hastings9cf065c2012-06-22 16:30:09 -0700435 requires_utime_dir_fd = unittest.skipUnless(
436 os.utime in os.supports_dir_fd,
437 "dir_fd support for utime required for this test.")
438 requires_utime_fd = unittest.skipUnless(
439 os.utime in os.supports_fd,
440 "fd support for utime required for this test.")
441 requires_utime_nofollow_symlinks = unittest.skipUnless(
442 os.utime in os.supports_follow_symlinks,
443 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700444
Larry Hastings9cf065c2012-06-22 16:30:09 -0700445 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700446 def test_lutimes_ns(self):
447 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700448 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700449 self._test_utime_ns(lutimes_ns)
450
Larry Hastings9cf065c2012-06-22 16:30:09 -0700451 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700452 def test_futimes_ns(self):
453 def futimes_ns(file, times):
454 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700455 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700456 self._test_utime_ns(futimes_ns, test_dir=False)
457
458 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700459 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700460 getattr(os, name)(arg, (5, 5), ns=(5, 5))
461
462 def test_utime_invalid_arguments(self):
463 self._utime_invalid_arguments('utime', self.fname)
464
Brian Curtin52fbea12011-11-06 13:41:17 -0600465
Victor Stinner1aa54a42012-02-08 04:09:37 +0100466 @unittest.skipUnless(stat_supports_subsecond,
467 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100468 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100469 asec, amsec = 1, 901
470 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100471 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100472 mtime = msec + mmsec * 1e-3
473 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100474 os.utime(filename, (0, 0))
475 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200476 with warnings.catch_warnings():
477 warnings.simplefilter("ignore", DeprecationWarning)
478 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100479 st = os.stat(filename)
480 self.assertAlmostEqual(st.st_atime, atime, places=3)
481 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100482
Victor Stinnera2f7c002012-02-08 03:36:25 +0100483 def test_utime_subsecond(self):
484 def set_time(filename, atime, mtime):
485 os.utime(filename, (atime, mtime))
486 self._test_utime_subsecond(set_time)
487
Larry Hastings9cf065c2012-06-22 16:30:09 -0700488 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100489 def test_futimes_subsecond(self):
490 def set_time(filename, atime, mtime):
491 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700492 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100493 self._test_utime_subsecond(set_time)
494
Larry Hastings9cf065c2012-06-22 16:30:09 -0700495 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100496 def test_futimens_subsecond(self):
497 def set_time(filename, atime, mtime):
498 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700499 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100500 self._test_utime_subsecond(set_time)
501
Larry Hastings9cf065c2012-06-22 16:30:09 -0700502 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100503 def test_futimesat_subsecond(self):
504 def set_time(filename, atime, mtime):
505 dirname = os.path.dirname(filename)
506 dirfd = os.open(dirname, os.O_RDONLY)
507 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700508 os.utime(os.path.basename(filename), dir_fd=dirfd,
509 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100510 finally:
511 os.close(dirfd)
512 self._test_utime_subsecond(set_time)
513
Larry Hastings9cf065c2012-06-22 16:30:09 -0700514 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100515 def test_lutimes_subsecond(self):
516 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700517 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100518 self._test_utime_subsecond(set_time)
519
Larry Hastings9cf065c2012-06-22 16:30:09 -0700520 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100521 def test_utimensat_subsecond(self):
522 def set_time(filename, atime, mtime):
523 dirname = os.path.dirname(filename)
524 dirfd = os.open(dirname, os.O_RDONLY)
525 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700526 os.utime(os.path.basename(filename), dir_fd=dirfd,
527 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100528 finally:
529 os.close(dirfd)
530 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100531
Serhiy Storchaka43767632013-11-03 21:31:38 +0200532 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000533 # systems support centiseconds
Serhiy Storchaka43767632013-11-03 21:31:38 +0200534 def get_file_system(path):
535 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000536 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000537 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000538 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000539 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000540 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000541 return buf.value
542
Serhiy Storchaka43767632013-11-03 21:31:38 +0200543 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
544 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
545 "requires NTFS")
546 def test_1565150(self):
547 t1 = 1159195039.25
548 os.utime(self.fname, (t1, t1))
549 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000550
Serhiy Storchaka43767632013-11-03 21:31:38 +0200551 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
552 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
553 "requires NTFS")
554 def test_large_time(self):
555 t1 = 5000000000 # some day in 2128
556 os.utime(self.fname, (t1, t1))
557 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000558
Serhiy Storchaka43767632013-11-03 21:31:38 +0200559 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
560 def test_1686475(self):
561 # Verify that an open file can be stat'ed
562 try:
563 os.stat(r"c:\pagefile.sys")
564 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600565 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200566 except OSError as e:
567 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000568
Serhiy Storchaka43767632013-11-03 21:31:38 +0200569 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
570 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
571 def test_15261(self):
572 # Verify that stat'ing a closed fd does not cause crash
573 r, w = os.pipe()
574 try:
575 os.stat(r) # should not raise error
576 finally:
577 os.close(r)
578 os.close(w)
579 with self.assertRaises(OSError) as ctx:
580 os.stat(r)
581 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100582
Zachary Ware63f277b2014-06-19 09:46:37 -0500583 def check_file_attributes(self, result):
584 self.assertTrue(hasattr(result, 'st_file_attributes'))
585 self.assertTrue(isinstance(result.st_file_attributes, int))
586 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
587
588 @unittest.skipUnless(sys.platform == "win32",
589 "st_file_attributes is Win32 specific")
590 def test_file_attributes(self):
591 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
592 result = os.stat(self.fname)
593 self.check_file_attributes(result)
594 self.assertEqual(
595 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
596 0)
597
598 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
599 result = os.stat(support.TESTFN)
600 self.check_file_attributes(result)
601 self.assertEqual(
602 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
603 stat.FILE_ATTRIBUTE_DIRECTORY)
604
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000605from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000606
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000607class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000608 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000609 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000610
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000611 def setUp(self):
612 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000613 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000614 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000615 for key, value in self._reference().items():
616 os.environ[key] = value
617
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000618 def tearDown(self):
619 os.environ.clear()
620 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000621 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000622 os.environb.clear()
623 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000624
Christian Heimes90333392007-11-01 19:08:42 +0000625 def _reference(self):
626 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
627
628 def _empty_mapping(self):
629 os.environ.clear()
630 return os.environ
631
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000632 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300633 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000634 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000635 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300636 os.environ.update(HELLO="World")
637 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
638 value = popen.read().strip()
639 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000640
Ezio Melottic7e139b2012-09-26 20:01:34 +0300641 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000642 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300643 with os.popen(
644 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
645 it = iter(popen)
646 self.assertEqual(next(it), "line1\n")
647 self.assertEqual(next(it), "line2\n")
648 self.assertEqual(next(it), "line3\n")
649 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000650
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000651 # Verify environ keys and values from the OS are of the
652 # correct str type.
653 def test_keyvalue_types(self):
654 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000655 self.assertEqual(type(key), str)
656 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000657
Christian Heimes90333392007-11-01 19:08:42 +0000658 def test_items(self):
659 for key, value in self._reference().items():
660 self.assertEqual(os.environ.get(key), value)
661
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000662 # Issue 7310
663 def test___repr__(self):
664 """Check that the repr() of os.environ looks like environ({...})."""
665 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000666 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
667 '{!r}: {!r}'.format(key, value)
668 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000669
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000670 def test_get_exec_path(self):
671 defpath_list = os.defpath.split(os.pathsep)
672 test_path = ['/monty', '/python', '', '/flying/circus']
673 test_env = {'PATH': os.pathsep.join(test_path)}
674
675 saved_environ = os.environ
676 try:
677 os.environ = dict(test_env)
678 # Test that defaulting to os.environ works.
679 self.assertSequenceEqual(test_path, os.get_exec_path())
680 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
681 finally:
682 os.environ = saved_environ
683
684 # No PATH environment variable
685 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
686 # Empty PATH environment variable
687 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
688 # Supplied PATH environment variable
689 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
690
Victor Stinnerb745a742010-05-18 17:17:23 +0000691 if os.supports_bytes_environ:
692 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000693 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000694 # ignore BytesWarning warning
695 with warnings.catch_warnings(record=True):
696 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000697 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000698 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000699 pass
700 else:
701 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000702
703 # bytes key and/or value
704 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
705 ['abc'])
706 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
707 ['abc'])
708 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
709 ['abc'])
710
711 @unittest.skipUnless(os.supports_bytes_environ,
712 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000713 def test_environb(self):
714 # os.environ -> os.environb
715 value = 'euro\u20ac'
716 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000717 value_bytes = value.encode(sys.getfilesystemencoding(),
718 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000719 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000720 msg = "U+20AC character is not encodable to %s" % (
721 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000722 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000723 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000724 self.assertEqual(os.environ['unicode'], value)
725 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000726
727 # os.environb -> os.environ
728 value = b'\xff'
729 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000730 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000731 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000732 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000733
Charles-François Natali2966f102011-11-26 11:32:46 +0100734 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
735 # #13415).
736 @support.requires_freebsd_version(7)
737 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100738 def test_unset_error(self):
739 if sys.platform == "win32":
740 # an environment variable is limited to 32,767 characters
741 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100742 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100743 else:
744 # "=" is not allowed in a variable name
745 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100746 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100747
Victor Stinner6d101392013-04-14 16:35:04 +0200748 def test_key_type(self):
749 missing = 'missingkey'
750 self.assertNotIn(missing, os.environ)
751
Victor Stinner839e5ea2013-04-14 16:43:03 +0200752 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200753 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200754 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200755 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200756
Victor Stinner839e5ea2013-04-14 16:43:03 +0200757 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200758 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200759 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200760 self.assertTrue(cm.exception.__suppress_context__)
761
Victor Stinner6d101392013-04-14 16:35:04 +0200762
Tim Petersc4e09402003-04-25 07:11:48 +0000763class WalkTests(unittest.TestCase):
764 """Tests for os.walk()."""
765
Charles-François Natali7372b062012-02-05 15:15:38 +0100766 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000767 import os
768 from os.path import join
769
770 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000771 # TESTFN/
772 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000773 # tmp1
774 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000775 # tmp2
776 # SUB11/ no kids
777 # SUB2/ a file kid and a dirsymlink kid
778 # tmp3
779 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200780 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000781 # TEST2/
782 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000783 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000784 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000785 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000786 sub2_path = join(walk_path, "SUB2")
787 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000788 tmp2_path = join(sub1_path, "tmp2")
789 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000790 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000791 t2_path = join(support.TESTFN, "TEST2")
792 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200793 link_path = join(sub2_path, "link")
794 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000795
796 # Create stuff.
797 os.makedirs(sub11_path)
798 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000799 os.makedirs(t2_path)
800 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000801 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000802 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
803 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000804 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400805 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400806 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200807 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000808 else:
809 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000810
811 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000812 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000813 self.assertEqual(len(all), 4)
814 # We can't know which order SUB1 and SUB2 will appear in.
815 # Not flipped: TESTFN, SUB1, SUB11, SUB2
816 # flipped: TESTFN, SUB2, SUB1, SUB11
817 flipped = all[0][1][0] != "SUB1"
818 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200819 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000820 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000821 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
822 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000823 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000824
825 # Prune the search.
826 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000827 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000828 all.append((root, dirs, files))
829 # Don't descend into SUB1.
830 if 'SUB1' in dirs:
831 # Note that this also mutates the dirs we appended to all!
832 dirs.remove('SUB1')
833 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000834 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200835 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000836 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000837
838 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000839 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000840 self.assertEqual(len(all), 4)
841 # We can't know which order SUB1 and SUB2 will appear in.
842 # Not flipped: SUB11, SUB1, SUB2, TESTFN
843 # flipped: SUB2, SUB11, SUB1, TESTFN
844 flipped = all[3][1][0] != "SUB1"
845 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200846 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000847 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000848 self.assertEqual(all[flipped], (sub11_path, [], []))
849 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000850 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000851
Brian Curtin3b4499c2010-12-28 14:31:47 +0000852 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000853 # Walk, following symlinks.
854 for root, dirs, files in os.walk(walk_path, followlinks=True):
855 if root == link_path:
856 self.assertEqual(dirs, [])
857 self.assertEqual(files, ["tmp4"])
858 break
859 else:
860 self.fail("Didn't follow symlink with followlinks=True")
861
862 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000863 # Tear everything down. This is a decent use for bottom-up on
864 # Windows, which doesn't have a recursive delete command. The
865 # (not so) subtlety is that rmdir will fail unless the dir's
866 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000867 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000868 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000869 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000870 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000871 dirname = os.path.join(root, name)
872 if not os.path.islink(dirname):
873 os.rmdir(dirname)
874 else:
875 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000876 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000877
Charles-François Natali7372b062012-02-05 15:15:38 +0100878
879@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
880class FwalkTests(WalkTests):
881 """Tests for os.fwalk()."""
882
Larry Hastingsc48fe982012-06-25 04:49:05 -0700883 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
884 """
885 compare with walk() results.
886 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700887 walk_kwargs = walk_kwargs.copy()
888 fwalk_kwargs = fwalk_kwargs.copy()
889 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
890 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
891 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700892
Charles-François Natali7372b062012-02-05 15:15:38 +0100893 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700894 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100895 expected[root] = (set(dirs), set(files))
896
Larry Hastingsc48fe982012-06-25 04:49:05 -0700897 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100898 self.assertIn(root, expected)
899 self.assertEqual(expected[root], (set(dirs), set(files)))
900
Larry Hastingsc48fe982012-06-25 04:49:05 -0700901 def test_compare_to_walk(self):
902 kwargs = {'top': support.TESTFN}
903 self._compare_to_walk(kwargs, kwargs)
904
Charles-François Natali7372b062012-02-05 15:15:38 +0100905 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700906 try:
907 fd = os.open(".", os.O_RDONLY)
908 walk_kwargs = {'top': support.TESTFN}
909 fwalk_kwargs = walk_kwargs.copy()
910 fwalk_kwargs['dir_fd'] = fd
911 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
912 finally:
913 os.close(fd)
914
915 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100916 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700917 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
918 args = support.TESTFN, topdown, None
919 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100920 # check that the FD is valid
921 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700922 # redundant check
923 os.stat(rootfd)
924 # check that listdir() returns consistent information
925 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100926
927 def test_fd_leak(self):
928 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
929 # we both check that calling fwalk() a large number of times doesn't
930 # yield EMFILE, and that the minimum allocated FD hasn't changed.
931 minfd = os.dup(1)
932 os.close(minfd)
933 for i in range(256):
934 for x in os.fwalk(support.TESTFN):
935 pass
936 newfd = os.dup(1)
937 self.addCleanup(os.close, newfd)
938 self.assertEqual(newfd, minfd)
939
940 def tearDown(self):
941 # cleanup
942 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
943 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700944 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100945 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700946 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700947 if stat.S_ISDIR(st.st_mode):
948 os.rmdir(name, dir_fd=rootfd)
949 else:
950 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100951 os.rmdir(support.TESTFN)
952
953
Guido van Rossume7ba4952007-06-06 23:52:48 +0000954class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000955 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000956 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000957
958 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000959 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000960 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
961 os.makedirs(path) # Should work
962 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
963 os.makedirs(path)
964
965 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000966 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000967 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
968 os.makedirs(path)
969 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
970 'dir5', 'dir6')
971 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000972
Terry Reedy5a22b652010-12-02 07:05:56 +0000973 def test_exist_ok_existing_directory(self):
974 path = os.path.join(support.TESTFN, 'dir1')
975 mode = 0o777
976 old_mask = os.umask(0o022)
977 os.makedirs(path, mode)
978 self.assertRaises(OSError, os.makedirs, path, mode)
979 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -0400980 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +0000981 os.makedirs(path, mode=mode, exist_ok=True)
982 os.umask(old_mask)
983
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700984 def test_exist_ok_s_isgid_directory(self):
985 path = os.path.join(support.TESTFN, 'dir1')
986 S_ISGID = stat.S_ISGID
987 mode = 0o777
988 old_mask = os.umask(0o022)
989 try:
990 existing_testfn_mode = stat.S_IMODE(
991 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700992 try:
993 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700994 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700995 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700996 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
997 raise unittest.SkipTest('No support for S_ISGID dir mode.')
998 # The os should apply S_ISGID from the parent dir for us, but
999 # this test need not depend on that behavior. Be explicit.
1000 os.makedirs(path, mode | S_ISGID)
1001 # http://bugs.python.org/issue14992
1002 # Should not fail when the bit is already set.
1003 os.makedirs(path, mode, exist_ok=True)
1004 # remove the bit.
1005 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001006 # May work even when the bit is not already set when demanded.
1007 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001008 finally:
1009 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001010
1011 def test_exist_ok_existing_regular_file(self):
1012 base = support.TESTFN
1013 path = os.path.join(support.TESTFN, 'dir1')
1014 f = open(path, 'w')
1015 f.write('abc')
1016 f.close()
1017 self.assertRaises(OSError, os.makedirs, path)
1018 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1019 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1020 os.remove(path)
1021
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001022 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001023 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001024 'dir4', 'dir5', 'dir6')
1025 # If the tests failed, the bottom-most directory ('../dir6')
1026 # may not have been created, so we look for the outermost directory
1027 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001028 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001029 path = os.path.dirname(path)
1030
1031 os.removedirs(path)
1032
Andrew Svetlov405faed2012-12-25 12:18:09 +02001033
R David Murrayf2ad1732014-12-25 18:36:56 -05001034@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1035class ChownFileTests(unittest.TestCase):
1036
1037 def setUpClass():
1038 os.mkdir(support.TESTFN)
1039
1040 def test_chown_uid_gid_arguments_must_be_index(self):
1041 stat = os.stat(support.TESTFN)
1042 uid = stat.st_uid
1043 gid = stat.st_gid
1044 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1045 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1046 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1047 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1048 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1049
1050 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1051 def test_chown(self):
1052 gid_1, gid_2 = groups[:2]
1053 uid = os.stat(support.TESTFN).st_uid
1054 os.chown(support.TESTFN, uid, gid_1)
1055 gid = os.stat(support.TESTFN).st_gid
1056 self.assertEqual(gid, gid_1)
1057 os.chown(support.TESTFN, uid, gid_2)
1058 gid = os.stat(support.TESTFN).st_gid
1059 self.assertEqual(gid, gid_2)
1060
1061 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1062 "test needs root privilege and more than one user")
1063 def test_chown_with_root(self):
1064 uid_1, uid_2 = all_users[:2]
1065 gid = os.stat(support.TESTFN).st_gid
1066 os.chown(support.TESTFN, uid_1, gid)
1067 uid = os.stat(support.TESTFN).st_uid
1068 self.assertEqual(uid, uid_1)
1069 os.chown(support.TESTFN, uid_2, gid)
1070 uid = os.stat(support.TESTFN).st_uid
1071 self.assertEqual(uid, uid_2)
1072
1073 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1074 "test needs non-root account and more than one user")
1075 def test_chown_without_permission(self):
1076 uid_1, uid_2 = all_users[:2]
1077 gid = os.stat(support.TESTFN).st_gid
1078 with self.assertRaisesRegex(PermissionError, "Operation not permitted"):
1079 os.chown(support.TESTFN, uid_1, gid)
1080 os.chown(support.TESTFN, uid_2, gid)
1081
1082 def tearDownClass():
1083 os.rmdir(support.TESTFN)
1084
1085
Andrew Svetlov405faed2012-12-25 12:18:09 +02001086class RemoveDirsTests(unittest.TestCase):
1087 def setUp(self):
1088 os.makedirs(support.TESTFN)
1089
1090 def tearDown(self):
1091 support.rmtree(support.TESTFN)
1092
1093 def test_remove_all(self):
1094 dira = os.path.join(support.TESTFN, 'dira')
1095 os.mkdir(dira)
1096 dirb = os.path.join(dira, 'dirb')
1097 os.mkdir(dirb)
1098 os.removedirs(dirb)
1099 self.assertFalse(os.path.exists(dirb))
1100 self.assertFalse(os.path.exists(dira))
1101 self.assertFalse(os.path.exists(support.TESTFN))
1102
1103 def test_remove_partial(self):
1104 dira = os.path.join(support.TESTFN, 'dira')
1105 os.mkdir(dira)
1106 dirb = os.path.join(dira, 'dirb')
1107 os.mkdir(dirb)
1108 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1109 f.write('text')
1110 os.removedirs(dirb)
1111 self.assertFalse(os.path.exists(dirb))
1112 self.assertTrue(os.path.exists(dira))
1113 self.assertTrue(os.path.exists(support.TESTFN))
1114
1115 def test_remove_nothing(self):
1116 dira = os.path.join(support.TESTFN, 'dira')
1117 os.mkdir(dira)
1118 dirb = os.path.join(dira, 'dirb')
1119 os.mkdir(dirb)
1120 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1121 f.write('text')
1122 with self.assertRaises(OSError):
1123 os.removedirs(dirb)
1124 self.assertTrue(os.path.exists(dirb))
1125 self.assertTrue(os.path.exists(dira))
1126 self.assertTrue(os.path.exists(support.TESTFN))
1127
1128
Guido van Rossume7ba4952007-06-06 23:52:48 +00001129class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001130 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001131 with open(os.devnull, 'wb') as f:
1132 f.write(b'hello')
1133 f.close()
1134 with open(os.devnull, 'rb') as f:
1135 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001136
Andrew Svetlov405faed2012-12-25 12:18:09 +02001137
Guido van Rossume7ba4952007-06-06 23:52:48 +00001138class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001139 def test_urandom_length(self):
1140 self.assertEqual(len(os.urandom(0)), 0)
1141 self.assertEqual(len(os.urandom(1)), 1)
1142 self.assertEqual(len(os.urandom(10)), 10)
1143 self.assertEqual(len(os.urandom(100)), 100)
1144 self.assertEqual(len(os.urandom(1000)), 1000)
1145
1146 def test_urandom_value(self):
1147 data1 = os.urandom(16)
1148 data2 = os.urandom(16)
1149 self.assertNotEqual(data1, data2)
1150
1151 def get_urandom_subprocess(self, count):
1152 code = '\n'.join((
1153 'import os, sys',
1154 'data = os.urandom(%s)' % count,
1155 'sys.stdout.buffer.write(data)',
1156 'sys.stdout.buffer.flush()'))
1157 out = assert_python_ok('-c', code)
1158 stdout = out[1]
1159 self.assertEqual(len(stdout), 16)
1160 return stdout
1161
1162 def test_urandom_subprocess(self):
1163 data1 = self.get_urandom_subprocess(16)
1164 data2 = self.get_urandom_subprocess(16)
1165 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001166
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001167
1168HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
1169
1170@unittest.skipIf(HAVE_GETENTROPY,
1171 "getentropy() does not use a file descriptor")
1172class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001173 @unittest.skipUnless(resource, "test requires the resource module")
1174 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001175 # Check urandom() failing when it is not able to open /dev/random.
1176 # We spawn a new process to make the test more robust (if getrlimit()
1177 # failed to restore the file descriptor limit after this, the whole
1178 # test suite would crash; this actually happened on the OS X Tiger
1179 # buildbot).
1180 code = """if 1:
1181 import errno
1182 import os
1183 import resource
1184
1185 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1186 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1187 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001188 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001189 except OSError as e:
1190 assert e.errno == errno.EMFILE, e.errno
1191 else:
1192 raise AssertionError("OSError not raised")
1193 """
1194 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001195
Antoine Pitroue472aea2014-04-26 14:33:03 +02001196 def test_urandom_fd_closed(self):
1197 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1198 # closed.
1199 code = """if 1:
1200 import os
1201 import sys
1202 os.urandom(4)
1203 os.closerange(3, 256)
1204 sys.stdout.buffer.write(os.urandom(4))
1205 """
1206 rc, out, err = assert_python_ok('-Sc', code)
1207
1208 def test_urandom_fd_reopened(self):
1209 # Issue #21207: urandom() should detect its fd to /dev/urandom
1210 # changed to something else, and reopen it.
1211 with open(support.TESTFN, 'wb') as f:
1212 f.write(b"x" * 256)
1213 self.addCleanup(os.unlink, support.TESTFN)
1214 code = """if 1:
1215 import os
1216 import sys
1217 os.urandom(4)
1218 for fd in range(3, 256):
1219 try:
1220 os.close(fd)
1221 except OSError:
1222 pass
1223 else:
1224 # Found the urandom fd (XXX hopefully)
1225 break
1226 os.closerange(3, 256)
1227 with open({TESTFN!r}, 'rb') as f:
1228 os.dup2(f.fileno(), fd)
1229 sys.stdout.buffer.write(os.urandom(4))
1230 sys.stdout.buffer.write(os.urandom(4))
1231 """.format(TESTFN=support.TESTFN)
1232 rc, out, err = assert_python_ok('-Sc', code)
1233 self.assertEqual(len(out), 8)
1234 self.assertNotEqual(out[0:4], out[4:8])
1235 rc, out2, err2 = assert_python_ok('-Sc', code)
1236 self.assertEqual(len(out2), 8)
1237 self.assertNotEqual(out2, out)
1238
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001239
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001240@contextlib.contextmanager
1241def _execvpe_mockup(defpath=None):
1242 """
1243 Stubs out execv and execve functions when used as context manager.
1244 Records exec calls. The mock execv and execve functions always raise an
1245 exception as they would normally never return.
1246 """
1247 # A list of tuples containing (function name, first arg, args)
1248 # of calls to execv or execve that have been made.
1249 calls = []
1250
1251 def mock_execv(name, *args):
1252 calls.append(('execv', name, args))
1253 raise RuntimeError("execv called")
1254
1255 def mock_execve(name, *args):
1256 calls.append(('execve', name, args))
1257 raise OSError(errno.ENOTDIR, "execve called")
1258
1259 try:
1260 orig_execv = os.execv
1261 orig_execve = os.execve
1262 orig_defpath = os.defpath
1263 os.execv = mock_execv
1264 os.execve = mock_execve
1265 if defpath is not None:
1266 os.defpath = defpath
1267 yield calls
1268 finally:
1269 os.execv = orig_execv
1270 os.execve = orig_execve
1271 os.defpath = orig_defpath
1272
Guido van Rossume7ba4952007-06-06 23:52:48 +00001273class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001274 @unittest.skipIf(USING_LINUXTHREADS,
1275 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001276 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001277 self.assertRaises(OSError, os.execvpe, 'no such app-',
1278 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001279
Thomas Heller6790d602007-08-30 17:15:14 +00001280 def test_execvpe_with_bad_arglist(self):
1281 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1282
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001283 @unittest.skipUnless(hasattr(os, '_execvpe'),
1284 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001285 def _test_internal_execvpe(self, test_type):
1286 program_path = os.sep + 'absolutepath'
1287 if test_type is bytes:
1288 program = b'executable'
1289 fullpath = os.path.join(os.fsencode(program_path), program)
1290 native_fullpath = fullpath
1291 arguments = [b'progname', 'arg1', 'arg2']
1292 else:
1293 program = 'executable'
1294 arguments = ['progname', 'arg1', 'arg2']
1295 fullpath = os.path.join(program_path, program)
1296 if os.name != "nt":
1297 native_fullpath = os.fsencode(fullpath)
1298 else:
1299 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001300 env = {'spam': 'beans'}
1301
Victor Stinnerb745a742010-05-18 17:17:23 +00001302 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001303 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001304 self.assertRaises(RuntimeError,
1305 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001306 self.assertEqual(len(calls), 1)
1307 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1308
Victor Stinnerb745a742010-05-18 17:17:23 +00001309 # test os._execvpe() with a relative path:
1310 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001311 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001312 self.assertRaises(OSError,
1313 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001314 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001315 self.assertSequenceEqual(calls[0],
1316 ('execve', native_fullpath, (arguments, env)))
1317
1318 # test os._execvpe() with a relative path:
1319 # os.get_exec_path() reads the 'PATH' variable
1320 with _execvpe_mockup() as calls:
1321 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001322 if test_type is bytes:
1323 env_path[b'PATH'] = program_path
1324 else:
1325 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001326 self.assertRaises(OSError,
1327 os._execvpe, program, arguments, env=env_path)
1328 self.assertEqual(len(calls), 1)
1329 self.assertSequenceEqual(calls[0],
1330 ('execve', native_fullpath, (arguments, env_path)))
1331
1332 def test_internal_execvpe_str(self):
1333 self._test_internal_execvpe(str)
1334 if os.name != "nt":
1335 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001336
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001337
Serhiy Storchaka43767632013-11-03 21:31:38 +02001338@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001339class Win32ErrorTests(unittest.TestCase):
1340 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001341 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001342
1343 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001344 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001345
1346 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001347 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001348
1349 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001350 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001351 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001352 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001353 finally:
1354 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001355 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001356
1357 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001358 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001359
Thomas Wouters477c8d52006-05-27 19:21:47 +00001360 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001361 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001362
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001363class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001364 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001365 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1366 #singles.append("close")
1367 #We omit close because it doesn'r raise an exception on some platforms
1368 def get_single(f):
1369 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001370 if hasattr(os, f):
1371 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001372 return helper
1373 for f in singles:
1374 locals()["test_"+f] = get_single(f)
1375
Benjamin Peterson7522c742009-01-19 21:00:09 +00001376 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001377 try:
1378 f(support.make_bad_fd(), *args)
1379 except OSError as e:
1380 self.assertEqual(e.errno, errno.EBADF)
1381 else:
1382 self.fail("%r didn't raise a OSError with a bad file descriptor"
1383 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001384
Serhiy Storchaka43767632013-11-03 21:31:38 +02001385 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001386 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001387 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001388
Serhiy Storchaka43767632013-11-03 21:31:38 +02001389 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001390 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001391 fd = support.make_bad_fd()
1392 # Make sure none of the descriptors we are about to close are
1393 # currently valid (issue 6542).
1394 for i in range(10):
1395 try: os.fstat(fd+i)
1396 except OSError:
1397 pass
1398 else:
1399 break
1400 if i < 2:
1401 raise unittest.SkipTest(
1402 "Unable to acquire a range of invalid file descriptors")
1403 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001404
Serhiy Storchaka43767632013-11-03 21:31:38 +02001405 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001406 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001407 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001408
Serhiy Storchaka43767632013-11-03 21:31:38 +02001409 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001410 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001411 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001412
Serhiy Storchaka43767632013-11-03 21:31:38 +02001413 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001414 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001415 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001416
Serhiy Storchaka43767632013-11-03 21:31:38 +02001417 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001418 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001419 self.check(os.pathconf, "PC_NAME_MAX")
1420 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001421
Serhiy Storchaka43767632013-11-03 21:31:38 +02001422 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001423 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001424 self.check(os.truncate, 0)
1425 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001426
Serhiy Storchaka43767632013-11-03 21:31:38 +02001427 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001428 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001429 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001430
Serhiy Storchaka43767632013-11-03 21:31:38 +02001431 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001432 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001433 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001434
Victor Stinner57ddf782014-01-08 15:21:28 +01001435 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1436 def test_readv(self):
1437 buf = bytearray(10)
1438 self.check(os.readv, [buf])
1439
Serhiy Storchaka43767632013-11-03 21:31:38 +02001440 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001441 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001442 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001443
Serhiy Storchaka43767632013-11-03 21:31:38 +02001444 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001445 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001446 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001447
Victor Stinner57ddf782014-01-08 15:21:28 +01001448 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1449 def test_writev(self):
1450 self.check(os.writev, [b'abc'])
1451
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001452 def test_inheritable(self):
1453 self.check(os.get_inheritable)
1454 self.check(os.set_inheritable, True)
1455
1456 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1457 'needs os.get_blocking() and os.set_blocking()')
1458 def test_blocking(self):
1459 self.check(os.get_blocking)
1460 self.check(os.set_blocking, True)
1461
Brian Curtin1b9df392010-11-24 20:24:31 +00001462
1463class LinkTests(unittest.TestCase):
1464 def setUp(self):
1465 self.file1 = support.TESTFN
1466 self.file2 = os.path.join(support.TESTFN + "2")
1467
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001468 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001469 for file in (self.file1, self.file2):
1470 if os.path.exists(file):
1471 os.unlink(file)
1472
Brian Curtin1b9df392010-11-24 20:24:31 +00001473 def _test_link(self, file1, file2):
1474 with open(file1, "w") as f1:
1475 f1.write("test")
1476
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001477 with warnings.catch_warnings():
1478 warnings.simplefilter("ignore", DeprecationWarning)
1479 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001480 with open(file1, "r") as f1, open(file2, "r") as f2:
1481 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1482
1483 def test_link(self):
1484 self._test_link(self.file1, self.file2)
1485
1486 def test_link_bytes(self):
1487 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1488 bytes(self.file2, sys.getfilesystemencoding()))
1489
Brian Curtinf498b752010-11-30 15:54:04 +00001490 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001491 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001492 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001493 except UnicodeError:
1494 raise unittest.SkipTest("Unable to encode for this platform.")
1495
Brian Curtinf498b752010-11-30 15:54:04 +00001496 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001497 self.file2 = self.file1 + "2"
1498 self._test_link(self.file1, self.file2)
1499
Serhiy Storchaka43767632013-11-03 21:31:38 +02001500@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1501class PosixUidGidTests(unittest.TestCase):
1502 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1503 def test_setuid(self):
1504 if os.getuid() != 0:
1505 self.assertRaises(OSError, os.setuid, 0)
1506 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001507
Serhiy Storchaka43767632013-11-03 21:31:38 +02001508 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1509 def test_setgid(self):
1510 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1511 self.assertRaises(OSError, os.setgid, 0)
1512 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001513
Serhiy Storchaka43767632013-11-03 21:31:38 +02001514 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1515 def test_seteuid(self):
1516 if os.getuid() != 0:
1517 self.assertRaises(OSError, os.seteuid, 0)
1518 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001519
Serhiy Storchaka43767632013-11-03 21:31:38 +02001520 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1521 def test_setegid(self):
1522 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1523 self.assertRaises(OSError, os.setegid, 0)
1524 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001525
Serhiy Storchaka43767632013-11-03 21:31:38 +02001526 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1527 def test_setreuid(self):
1528 if os.getuid() != 0:
1529 self.assertRaises(OSError, os.setreuid, 0, 0)
1530 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1531 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001532
Serhiy Storchaka43767632013-11-03 21:31:38 +02001533 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1534 def test_setreuid_neg1(self):
1535 # Needs to accept -1. We run this in a subprocess to avoid
1536 # altering the test runner's process state (issue8045).
1537 subprocess.check_call([
1538 sys.executable, '-c',
1539 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001540
Serhiy Storchaka43767632013-11-03 21:31:38 +02001541 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1542 def test_setregid(self):
1543 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1544 self.assertRaises(OSError, os.setregid, 0, 0)
1545 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1546 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001547
Serhiy Storchaka43767632013-11-03 21:31:38 +02001548 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1549 def test_setregid_neg1(self):
1550 # Needs to accept -1. We run this in a subprocess to avoid
1551 # altering the test runner's process state (issue8045).
1552 subprocess.check_call([
1553 sys.executable, '-c',
1554 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001555
Serhiy Storchaka43767632013-11-03 21:31:38 +02001556@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1557class Pep383Tests(unittest.TestCase):
1558 def setUp(self):
1559 if support.TESTFN_UNENCODABLE:
1560 self.dir = support.TESTFN_UNENCODABLE
1561 elif support.TESTFN_NONASCII:
1562 self.dir = support.TESTFN_NONASCII
1563 else:
1564 self.dir = support.TESTFN
1565 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001566
Serhiy Storchaka43767632013-11-03 21:31:38 +02001567 bytesfn = []
1568 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001569 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001570 fn = os.fsencode(fn)
1571 except UnicodeEncodeError:
1572 return
1573 bytesfn.append(fn)
1574 add_filename(support.TESTFN_UNICODE)
1575 if support.TESTFN_UNENCODABLE:
1576 add_filename(support.TESTFN_UNENCODABLE)
1577 if support.TESTFN_NONASCII:
1578 add_filename(support.TESTFN_NONASCII)
1579 if not bytesfn:
1580 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001581
Serhiy Storchaka43767632013-11-03 21:31:38 +02001582 self.unicodefn = set()
1583 os.mkdir(self.dir)
1584 try:
1585 for fn in bytesfn:
1586 support.create_empty_file(os.path.join(self.bdir, fn))
1587 fn = os.fsdecode(fn)
1588 if fn in self.unicodefn:
1589 raise ValueError("duplicate filename")
1590 self.unicodefn.add(fn)
1591 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001592 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001593 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001594
Serhiy Storchaka43767632013-11-03 21:31:38 +02001595 def tearDown(self):
1596 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001597
Serhiy Storchaka43767632013-11-03 21:31:38 +02001598 def test_listdir(self):
1599 expected = self.unicodefn
1600 found = set(os.listdir(self.dir))
1601 self.assertEqual(found, expected)
1602 # test listdir without arguments
1603 current_directory = os.getcwd()
1604 try:
1605 os.chdir(os.sep)
1606 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1607 finally:
1608 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001609
Serhiy Storchaka43767632013-11-03 21:31:38 +02001610 def test_open(self):
1611 for fn in self.unicodefn:
1612 f = open(os.path.join(self.dir, fn), 'rb')
1613 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001614
Serhiy Storchaka43767632013-11-03 21:31:38 +02001615 @unittest.skipUnless(hasattr(os, 'statvfs'),
1616 "need os.statvfs()")
1617 def test_statvfs(self):
1618 # issue #9645
1619 for fn in self.unicodefn:
1620 # should not fail with file not found error
1621 fullname = os.path.join(self.dir, fn)
1622 os.statvfs(fullname)
1623
1624 def test_stat(self):
1625 for fn in self.unicodefn:
1626 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001627
Brian Curtineb24d742010-04-12 17:16:38 +00001628@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1629class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001630 def _kill(self, sig):
1631 # Start sys.executable as a subprocess and communicate from the
1632 # subprocess to the parent that the interpreter is ready. When it
1633 # becomes ready, send *sig* via os.kill to the subprocess and check
1634 # that the return code is equal to *sig*.
1635 import ctypes
1636 from ctypes import wintypes
1637 import msvcrt
1638
1639 # Since we can't access the contents of the process' stdout until the
1640 # process has exited, use PeekNamedPipe to see what's inside stdout
1641 # without waiting. This is done so we can tell that the interpreter
1642 # is started and running at a point where it could handle a signal.
1643 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1644 PeekNamedPipe.restype = wintypes.BOOL
1645 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1646 ctypes.POINTER(ctypes.c_char), # stdout buf
1647 wintypes.DWORD, # Buffer size
1648 ctypes.POINTER(wintypes.DWORD), # bytes read
1649 ctypes.POINTER(wintypes.DWORD), # bytes avail
1650 ctypes.POINTER(wintypes.DWORD)) # bytes left
1651 msg = "running"
1652 proc = subprocess.Popen([sys.executable, "-c",
1653 "import sys;"
1654 "sys.stdout.write('{}');"
1655 "sys.stdout.flush();"
1656 "input()".format(msg)],
1657 stdout=subprocess.PIPE,
1658 stderr=subprocess.PIPE,
1659 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001660 self.addCleanup(proc.stdout.close)
1661 self.addCleanup(proc.stderr.close)
1662 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001663
1664 count, max = 0, 100
1665 while count < max and proc.poll() is None:
1666 # Create a string buffer to store the result of stdout from the pipe
1667 buf = ctypes.create_string_buffer(len(msg))
1668 # Obtain the text currently in proc.stdout
1669 # Bytes read/avail/left are left as NULL and unused
1670 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1671 buf, ctypes.sizeof(buf), None, None, None)
1672 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1673 if buf.value:
1674 self.assertEqual(msg, buf.value.decode())
1675 break
1676 time.sleep(0.1)
1677 count += 1
1678 else:
1679 self.fail("Did not receive communication from the subprocess")
1680
Brian Curtineb24d742010-04-12 17:16:38 +00001681 os.kill(proc.pid, sig)
1682 self.assertEqual(proc.wait(), sig)
1683
1684 def test_kill_sigterm(self):
1685 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001686 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001687
1688 def test_kill_int(self):
1689 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001690 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001691
1692 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001693 tagname = "test_os_%s" % uuid.uuid1()
1694 m = mmap.mmap(-1, 1, tagname)
1695 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001696 # Run a script which has console control handling enabled.
1697 proc = subprocess.Popen([sys.executable,
1698 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001699 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001700 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1701 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001702 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001703 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001704 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001705 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001706 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001707 count += 1
1708 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001709 # Forcefully kill the process if we weren't able to signal it.
1710 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001711 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001712 os.kill(proc.pid, event)
1713 # proc.send_signal(event) could also be done here.
1714 # Allow time for the signal to be passed and the process to exit.
1715 time.sleep(0.5)
1716 if not proc.poll():
1717 # Forcefully kill the process if we weren't able to signal it.
1718 os.kill(proc.pid, signal.SIGINT)
1719 self.fail("subprocess did not stop on {}".format(name))
1720
1721 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1722 def test_CTRL_C_EVENT(self):
1723 from ctypes import wintypes
1724 import ctypes
1725
1726 # Make a NULL value by creating a pointer with no argument.
1727 NULL = ctypes.POINTER(ctypes.c_int)()
1728 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1729 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1730 wintypes.BOOL)
1731 SetConsoleCtrlHandler.restype = wintypes.BOOL
1732
1733 # Calling this with NULL and FALSE causes the calling process to
1734 # handle CTRL+C, rather than ignore it. This property is inherited
1735 # by subprocesses.
1736 SetConsoleCtrlHandler(NULL, 0)
1737
1738 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1739
1740 def test_CTRL_BREAK_EVENT(self):
1741 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1742
1743
Brian Curtind40e6f72010-07-08 21:39:08 +00001744@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001745class Win32ListdirTests(unittest.TestCase):
1746 """Test listdir on Windows."""
1747
1748 def setUp(self):
1749 self.created_paths = []
1750 for i in range(2):
1751 dir_name = 'SUB%d' % i
1752 dir_path = os.path.join(support.TESTFN, dir_name)
1753 file_name = 'FILE%d' % i
1754 file_path = os.path.join(support.TESTFN, file_name)
1755 os.makedirs(dir_path)
1756 with open(file_path, 'w') as f:
1757 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1758 self.created_paths.extend([dir_name, file_name])
1759 self.created_paths.sort()
1760
1761 def tearDown(self):
1762 shutil.rmtree(support.TESTFN)
1763
1764 def test_listdir_no_extended_path(self):
1765 """Test when the path is not an "extended" path."""
1766 # unicode
1767 self.assertEqual(
1768 sorted(os.listdir(support.TESTFN)),
1769 self.created_paths)
1770 # bytes
1771 self.assertEqual(
1772 sorted(os.listdir(os.fsencode(support.TESTFN))),
1773 [os.fsencode(path) for path in self.created_paths])
1774
1775 def test_listdir_extended_path(self):
1776 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001777 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001778 # unicode
1779 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1780 self.assertEqual(
1781 sorted(os.listdir(path)),
1782 self.created_paths)
1783 # bytes
1784 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1785 self.assertEqual(
1786 sorted(os.listdir(path)),
1787 [os.fsencode(path) for path in self.created_paths])
1788
1789
1790@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001791@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001792class Win32SymlinkTests(unittest.TestCase):
1793 filelink = 'filelinktest'
1794 filelink_target = os.path.abspath(__file__)
1795 dirlink = 'dirlinktest'
1796 dirlink_target = os.path.dirname(filelink_target)
1797 missing_link = 'missing link'
1798
1799 def setUp(self):
1800 assert os.path.exists(self.dirlink_target)
1801 assert os.path.exists(self.filelink_target)
1802 assert not os.path.exists(self.dirlink)
1803 assert not os.path.exists(self.filelink)
1804 assert not os.path.exists(self.missing_link)
1805
1806 def tearDown(self):
1807 if os.path.exists(self.filelink):
1808 os.remove(self.filelink)
1809 if os.path.exists(self.dirlink):
1810 os.rmdir(self.dirlink)
1811 if os.path.lexists(self.missing_link):
1812 os.remove(self.missing_link)
1813
1814 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001815 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001816 self.assertTrue(os.path.exists(self.dirlink))
1817 self.assertTrue(os.path.isdir(self.dirlink))
1818 self.assertTrue(os.path.islink(self.dirlink))
1819 self.check_stat(self.dirlink, self.dirlink_target)
1820
1821 def test_file_link(self):
1822 os.symlink(self.filelink_target, self.filelink)
1823 self.assertTrue(os.path.exists(self.filelink))
1824 self.assertTrue(os.path.isfile(self.filelink))
1825 self.assertTrue(os.path.islink(self.filelink))
1826 self.check_stat(self.filelink, self.filelink_target)
1827
1828 def _create_missing_dir_link(self):
1829 'Create a "directory" link to a non-existent target'
1830 linkname = self.missing_link
1831 if os.path.lexists(linkname):
1832 os.remove(linkname)
1833 target = r'c:\\target does not exist.29r3c740'
1834 assert not os.path.exists(target)
1835 target_is_dir = True
1836 os.symlink(target, linkname, target_is_dir)
1837
1838 def test_remove_directory_link_to_missing_target(self):
1839 self._create_missing_dir_link()
1840 # For compatibility with Unix, os.remove will check the
1841 # directory status and call RemoveDirectory if the symlink
1842 # was created with target_is_dir==True.
1843 os.remove(self.missing_link)
1844
1845 @unittest.skip("currently fails; consider for improvement")
1846 def test_isdir_on_directory_link_to_missing_target(self):
1847 self._create_missing_dir_link()
1848 # consider having isdir return true for directory links
1849 self.assertTrue(os.path.isdir(self.missing_link))
1850
1851 @unittest.skip("currently fails; consider for improvement")
1852 def test_rmdir_on_directory_link_to_missing_target(self):
1853 self._create_missing_dir_link()
1854 # consider allowing rmdir to remove directory links
1855 os.rmdir(self.missing_link)
1856
1857 def check_stat(self, link, target):
1858 self.assertEqual(os.stat(link), os.stat(target))
1859 self.assertNotEqual(os.lstat(link), os.stat(link))
1860
Brian Curtind25aef52011-06-13 15:16:04 -05001861 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001862 with warnings.catch_warnings():
1863 warnings.simplefilter("ignore", DeprecationWarning)
1864 self.assertEqual(os.stat(bytes_link), os.stat(target))
1865 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001866
1867 def test_12084(self):
1868 level1 = os.path.abspath(support.TESTFN)
1869 level2 = os.path.join(level1, "level2")
1870 level3 = os.path.join(level2, "level3")
1871 try:
1872 os.mkdir(level1)
1873 os.mkdir(level2)
1874 os.mkdir(level3)
1875
1876 file1 = os.path.abspath(os.path.join(level1, "file1"))
1877
1878 with open(file1, "w") as f:
1879 f.write("file1")
1880
1881 orig_dir = os.getcwd()
1882 try:
1883 os.chdir(level2)
1884 link = os.path.join(level2, "link")
1885 os.symlink(os.path.relpath(file1), "link")
1886 self.assertIn("link", os.listdir(os.getcwd()))
1887
1888 # Check os.stat calls from the same dir as the link
1889 self.assertEqual(os.stat(file1), os.stat("link"))
1890
1891 # Check os.stat calls from a dir below the link
1892 os.chdir(level1)
1893 self.assertEqual(os.stat(file1),
1894 os.stat(os.path.relpath(link)))
1895
1896 # Check os.stat calls from a dir above the link
1897 os.chdir(level3)
1898 self.assertEqual(os.stat(file1),
1899 os.stat(os.path.relpath(link)))
1900 finally:
1901 os.chdir(orig_dir)
1902 except OSError as err:
1903 self.fail(err)
1904 finally:
1905 os.remove(file1)
1906 shutil.rmtree(level1)
1907
Brian Curtind40e6f72010-07-08 21:39:08 +00001908
Tim Golden0321cf22014-05-05 19:46:17 +01001909@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1910class Win32JunctionTests(unittest.TestCase):
1911 junction = 'junctiontest'
1912 junction_target = os.path.dirname(os.path.abspath(__file__))
1913
1914 def setUp(self):
1915 assert os.path.exists(self.junction_target)
1916 assert not os.path.exists(self.junction)
1917
1918 def tearDown(self):
1919 if os.path.exists(self.junction):
1920 # os.rmdir delegates to Windows' RemoveDirectoryW,
1921 # which removes junction points safely.
1922 os.rmdir(self.junction)
1923
1924 def test_create_junction(self):
1925 _winapi.CreateJunction(self.junction_target, self.junction)
1926 self.assertTrue(os.path.exists(self.junction))
1927 self.assertTrue(os.path.isdir(self.junction))
1928
1929 # Junctions are not recognized as links.
1930 self.assertFalse(os.path.islink(self.junction))
1931
1932 def test_unlink_removes_junction(self):
1933 _winapi.CreateJunction(self.junction_target, self.junction)
1934 self.assertTrue(os.path.exists(self.junction))
1935
1936 os.unlink(self.junction)
1937 self.assertFalse(os.path.exists(self.junction))
1938
1939
Jason R. Coombs3a092862013-05-27 23:21:28 -04001940@support.skip_unless_symlink
1941class NonLocalSymlinkTests(unittest.TestCase):
1942
1943 def setUp(self):
1944 """
1945 Create this structure:
1946
1947 base
1948 \___ some_dir
1949 """
1950 os.makedirs('base/some_dir')
1951
1952 def tearDown(self):
1953 shutil.rmtree('base')
1954
1955 def test_directory_link_nonlocal(self):
1956 """
1957 The symlink target should resolve relative to the link, not relative
1958 to the current directory.
1959
1960 Then, link base/some_link -> base/some_dir and ensure that some_link
1961 is resolved as a directory.
1962
1963 In issue13772, it was discovered that directory detection failed if
1964 the symlink target was not specified relative to the current
1965 directory, which was a defect in the implementation.
1966 """
1967 src = os.path.join('base', 'some_link')
1968 os.symlink('some_dir', src)
1969 assert os.path.isdir(src)
1970
1971
Victor Stinnere8d51452010-08-19 01:05:19 +00001972class FSEncodingTests(unittest.TestCase):
1973 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001974 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1975 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001976
Victor Stinnere8d51452010-08-19 01:05:19 +00001977 def test_identity(self):
1978 # assert fsdecode(fsencode(x)) == x
1979 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1980 try:
1981 bytesfn = os.fsencode(fn)
1982 except UnicodeEncodeError:
1983 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001984 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001985
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001986
Brett Cannonefb00c02012-02-29 18:31:31 -05001987
1988class DeviceEncodingTests(unittest.TestCase):
1989
1990 def test_bad_fd(self):
1991 # Return None when an fd doesn't actually exist.
1992 self.assertIsNone(os.device_encoding(123456))
1993
Philip Jenveye308b7c2012-02-29 16:16:15 -08001994 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1995 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001996 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001997 def test_device_encoding(self):
1998 encoding = os.device_encoding(0)
1999 self.assertIsNotNone(encoding)
2000 self.assertTrue(codecs.lookup(encoding))
2001
2002
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002003class PidTests(unittest.TestCase):
2004 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2005 def test_getppid(self):
2006 p = subprocess.Popen([sys.executable, '-c',
2007 'import os; print(os.getppid())'],
2008 stdout=subprocess.PIPE)
2009 stdout, _ = p.communicate()
2010 # We are the parent of our subprocess
2011 self.assertEqual(int(stdout), os.getpid())
2012
2013
Brian Curtin0151b8e2010-09-24 13:43:43 +00002014# The introduction of this TestCase caused at least two different errors on
2015# *nix buildbots. Temporarily skip this to let the buildbots move along.
2016@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002017@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2018class LoginTests(unittest.TestCase):
2019 def test_getlogin(self):
2020 user_name = os.getlogin()
2021 self.assertNotEqual(len(user_name), 0)
2022
2023
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002024@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2025 "needs os.getpriority and os.setpriority")
2026class ProgramPriorityTests(unittest.TestCase):
2027 """Tests for os.getpriority() and os.setpriority()."""
2028
2029 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002030
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002031 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2032 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2033 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002034 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2035 if base >= 19 and new_prio <= 19:
2036 raise unittest.SkipTest(
2037 "unable to reliably test setpriority at current nice level of %s" % base)
2038 else:
2039 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002040 finally:
2041 try:
2042 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2043 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002044 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002045 raise
2046
2047
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002048if threading is not None:
2049 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002050
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002051 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002052
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002053 def __init__(self, conn):
2054 asynchat.async_chat.__init__(self, conn)
2055 self.in_buffer = []
2056 self.closed = False
2057 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002058
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002059 def handle_read(self):
2060 data = self.recv(4096)
2061 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002062
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002063 def get_data(self):
2064 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002065
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002066 def handle_close(self):
2067 self.close()
2068 self.closed = True
2069
2070 def handle_error(self):
2071 raise
2072
2073 def __init__(self, address):
2074 threading.Thread.__init__(self)
2075 asyncore.dispatcher.__init__(self)
2076 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2077 self.bind(address)
2078 self.listen(5)
2079 self.host, self.port = self.socket.getsockname()[:2]
2080 self.handler_instance = None
2081 self._active = False
2082 self._active_lock = threading.Lock()
2083
2084 # --- public API
2085
2086 @property
2087 def running(self):
2088 return self._active
2089
2090 def start(self):
2091 assert not self.running
2092 self.__flag = threading.Event()
2093 threading.Thread.start(self)
2094 self.__flag.wait()
2095
2096 def stop(self):
2097 assert self.running
2098 self._active = False
2099 self.join()
2100
2101 def wait(self):
2102 # wait for handler connection to be closed, then stop the server
2103 while not getattr(self.handler_instance, "closed", False):
2104 time.sleep(0.001)
2105 self.stop()
2106
2107 # --- internals
2108
2109 def run(self):
2110 self._active = True
2111 self.__flag.set()
2112 while self._active and asyncore.socket_map:
2113 self._active_lock.acquire()
2114 asyncore.loop(timeout=0.001, count=1)
2115 self._active_lock.release()
2116 asyncore.close_all()
2117
2118 def handle_accept(self):
2119 conn, addr = self.accept()
2120 self.handler_instance = self.Handler(conn)
2121
2122 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002123 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002124 handle_read = handle_connect
2125
2126 def writable(self):
2127 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002128
2129 def handle_error(self):
2130 raise
2131
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002132
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002133@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002134@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2135class TestSendfile(unittest.TestCase):
2136
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002137 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002138 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002139 not sys.platform.startswith("solaris") and \
2140 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002141 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2142 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002143
2144 @classmethod
2145 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002146 cls.key = support.threading_setup()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002147 with open(support.TESTFN, "wb") as f:
2148 f.write(cls.DATA)
2149
2150 @classmethod
2151 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002152 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002153 support.unlink(support.TESTFN)
2154
2155 def setUp(self):
2156 self.server = SendfileTestServer((support.HOST, 0))
2157 self.server.start()
2158 self.client = socket.socket()
2159 self.client.connect((self.server.host, self.server.port))
2160 self.client.settimeout(1)
2161 # synchronize by waiting for "220 ready" response
2162 self.client.recv(1024)
2163 self.sockno = self.client.fileno()
2164 self.file = open(support.TESTFN, 'rb')
2165 self.fileno = self.file.fileno()
2166
2167 def tearDown(self):
2168 self.file.close()
2169 self.client.close()
2170 if self.server.running:
2171 self.server.stop()
2172
2173 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2174 """A higher level wrapper representing how an application is
2175 supposed to use sendfile().
2176 """
2177 while 1:
2178 try:
2179 if self.SUPPORT_HEADERS_TRAILERS:
2180 return os.sendfile(sock, file, offset, nbytes, headers,
2181 trailers)
2182 else:
2183 return os.sendfile(sock, file, offset, nbytes)
2184 except OSError as err:
2185 if err.errno == errno.ECONNRESET:
2186 # disconnected
2187 raise
2188 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2189 # we have to retry send data
2190 continue
2191 else:
2192 raise
2193
2194 def test_send_whole_file(self):
2195 # normal send
2196 total_sent = 0
2197 offset = 0
2198 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002199 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002200 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2201 if sent == 0:
2202 break
2203 offset += sent
2204 total_sent += sent
2205 self.assertTrue(sent <= nbytes)
2206 self.assertEqual(offset, total_sent)
2207
2208 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002209 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002210 self.client.close()
2211 self.server.wait()
2212 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002213 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002214 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002215
2216 def test_send_at_certain_offset(self):
2217 # start sending a file at a certain offset
2218 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002219 offset = len(self.DATA) // 2
2220 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002221 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002222 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002223 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2224 if sent == 0:
2225 break
2226 offset += sent
2227 total_sent += sent
2228 self.assertTrue(sent <= nbytes)
2229
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002230 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002231 self.client.close()
2232 self.server.wait()
2233 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002234 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002235 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002236 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002237 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002238
2239 def test_offset_overflow(self):
2240 # specify an offset > file size
2241 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002242 try:
2243 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2244 except OSError as e:
2245 # Solaris can raise EINVAL if offset >= file length, ignore.
2246 if e.errno != errno.EINVAL:
2247 raise
2248 else:
2249 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002250 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002251 self.client.close()
2252 self.server.wait()
2253 data = self.server.handler_instance.get_data()
2254 self.assertEqual(data, b'')
2255
2256 def test_invalid_offset(self):
2257 with self.assertRaises(OSError) as cm:
2258 os.sendfile(self.sockno, self.fileno, -1, 4096)
2259 self.assertEqual(cm.exception.errno, errno.EINVAL)
2260
2261 # --- headers / trailers tests
2262
Serhiy Storchaka43767632013-11-03 21:31:38 +02002263 @requires_headers_trailers
2264 def test_headers(self):
2265 total_sent = 0
2266 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2267 headers=[b"x" * 512])
2268 total_sent += sent
2269 offset = 4096
2270 nbytes = 4096
2271 while 1:
2272 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2273 offset, nbytes)
2274 if sent == 0:
2275 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002276 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002277 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002278
Serhiy Storchaka43767632013-11-03 21:31:38 +02002279 expected_data = b"x" * 512 + self.DATA
2280 self.assertEqual(total_sent, len(expected_data))
2281 self.client.close()
2282 self.server.wait()
2283 data = self.server.handler_instance.get_data()
2284 self.assertEqual(hash(data), hash(expected_data))
2285
2286 @requires_headers_trailers
2287 def test_trailers(self):
2288 TESTFN2 = support.TESTFN + "2"
2289 file_data = b"abcdef"
2290 with open(TESTFN2, 'wb') as f:
2291 f.write(file_data)
2292 with open(TESTFN2, 'rb')as f:
2293 self.addCleanup(os.remove, TESTFN2)
2294 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2295 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002296 self.client.close()
2297 self.server.wait()
2298 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002299 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002300
Serhiy Storchaka43767632013-11-03 21:31:38 +02002301 @requires_headers_trailers
2302 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2303 'test needs os.SF_NODISKIO')
2304 def test_flags(self):
2305 try:
2306 os.sendfile(self.sockno, self.fileno, 0, 4096,
2307 flags=os.SF_NODISKIO)
2308 except OSError as err:
2309 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2310 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002311
2312
Larry Hastings9cf065c2012-06-22 16:30:09 -07002313def supports_extended_attributes():
2314 if not hasattr(os, "setxattr"):
2315 return False
2316 try:
2317 with open(support.TESTFN, "wb") as fp:
2318 try:
2319 os.setxattr(fp.fileno(), b"user.test", b"")
2320 except OSError:
2321 return False
2322 finally:
2323 support.unlink(support.TESTFN)
2324 # Kernels < 2.6.39 don't respect setxattr flags.
2325 kernel_version = platform.release()
2326 m = re.match("2.6.(\d{1,2})", kernel_version)
2327 return m is None or int(m.group(1)) >= 39
2328
2329
2330@unittest.skipUnless(supports_extended_attributes(),
2331 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002332class ExtendedAttributeTests(unittest.TestCase):
2333
2334 def tearDown(self):
2335 support.unlink(support.TESTFN)
2336
Larry Hastings9cf065c2012-06-22 16:30:09 -07002337 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002338 fn = support.TESTFN
2339 open(fn, "wb").close()
2340 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002341 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002342 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002343 init_xattr = listxattr(fn)
2344 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002345 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002346 xattr = set(init_xattr)
2347 xattr.add("user.test")
2348 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002349 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2350 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2351 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002352 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002353 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002354 self.assertEqual(cm.exception.errno, errno.EEXIST)
2355 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002356 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002357 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002358 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002359 xattr.add("user.test2")
2360 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002361 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002362 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002363 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002364 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002365 xattr.remove("user.test")
2366 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002367 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2368 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2369 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2370 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002371 many = sorted("user.test{}".format(i) for i in range(100))
2372 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002373 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002374 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002375
Larry Hastings9cf065c2012-06-22 16:30:09 -07002376 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002377 def make_bytes(s):
2378 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002379 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002380 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002381 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002382
2383 def test_simple(self):
2384 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2385 os.listxattr)
2386
2387 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002388 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2389 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002390
2391 def test_fds(self):
2392 def getxattr(path, *args):
2393 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002394 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002395 def setxattr(path, *args):
2396 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002397 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002398 def removexattr(path, *args):
2399 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002400 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002401 def listxattr(path, *args):
2402 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002403 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002404 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2405
2406
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002407@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2408class Win32DeprecatedBytesAPI(unittest.TestCase):
2409 def test_deprecated(self):
2410 import nt
2411 filename = os.fsencode(support.TESTFN)
2412 with warnings.catch_warnings():
2413 warnings.simplefilter("error", DeprecationWarning)
2414 for func, *args in (
2415 (nt._getfullpathname, filename),
2416 (nt._isdir, filename),
2417 (os.access, filename, os.R_OK),
2418 (os.chdir, filename),
2419 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002420 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002421 (os.link, filename, filename),
2422 (os.listdir, filename),
2423 (os.lstat, filename),
2424 (os.mkdir, filename),
2425 (os.open, filename, os.O_RDONLY),
2426 (os.rename, filename, filename),
2427 (os.rmdir, filename),
2428 (os.startfile, filename),
2429 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002430 (os.unlink, filename),
2431 (os.utime, filename),
2432 ):
2433 self.assertRaises(DeprecationWarning, func, *args)
2434
Victor Stinner28216442011-11-16 00:34:44 +01002435 @support.skip_unless_symlink
2436 def test_symlink(self):
2437 filename = os.fsencode(support.TESTFN)
2438 with warnings.catch_warnings():
2439 warnings.simplefilter("error", DeprecationWarning)
2440 self.assertRaises(DeprecationWarning,
2441 os.symlink, filename, filename)
2442
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002443
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002444@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2445class TermsizeTests(unittest.TestCase):
2446 def test_does_not_crash(self):
2447 """Check if get_terminal_size() returns a meaningful value.
2448
2449 There's no easy portable way to actually check the size of the
2450 terminal, so let's check if it returns something sensible instead.
2451 """
2452 try:
2453 size = os.get_terminal_size()
2454 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002455 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002456 # Under win32 a generic OSError can be thrown if the
2457 # handle cannot be retrieved
2458 self.skipTest("failed to query terminal size")
2459 raise
2460
Antoine Pitroucfade362012-02-08 23:48:59 +01002461 self.assertGreaterEqual(size.columns, 0)
2462 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002463
2464 def test_stty_match(self):
2465 """Check if stty returns the same results
2466
2467 stty actually tests stdin, so get_terminal_size is invoked on
2468 stdin explicitly. If stty succeeded, then get_terminal_size()
2469 should work too.
2470 """
2471 try:
2472 size = subprocess.check_output(['stty', 'size']).decode().split()
2473 except (FileNotFoundError, subprocess.CalledProcessError):
2474 self.skipTest("stty invocation failed")
2475 expected = (int(size[1]), int(size[0])) # reversed order
2476
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002477 try:
2478 actual = os.get_terminal_size(sys.__stdin__.fileno())
2479 except OSError as e:
2480 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2481 # Under win32 a generic OSError can be thrown if the
2482 # handle cannot be retrieved
2483 self.skipTest("failed to query terminal size")
2484 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002485 self.assertEqual(expected, actual)
2486
2487
Victor Stinner292c8352012-10-30 02:17:38 +01002488class OSErrorTests(unittest.TestCase):
2489 def setUp(self):
2490 class Str(str):
2491 pass
2492
Victor Stinnerafe17062012-10-31 22:47:43 +01002493 self.bytes_filenames = []
2494 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002495 if support.TESTFN_UNENCODABLE is not None:
2496 decoded = support.TESTFN_UNENCODABLE
2497 else:
2498 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002499 self.unicode_filenames.append(decoded)
2500 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002501 if support.TESTFN_UNDECODABLE is not None:
2502 encoded = support.TESTFN_UNDECODABLE
2503 else:
2504 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002505 self.bytes_filenames.append(encoded)
2506 self.bytes_filenames.append(memoryview(encoded))
2507
2508 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002509
2510 def test_oserror_filename(self):
2511 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002512 (self.filenames, os.chdir,),
2513 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002514 (self.filenames, os.lstat,),
2515 (self.filenames, os.open, os.O_RDONLY),
2516 (self.filenames, os.rmdir,),
2517 (self.filenames, os.stat,),
2518 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002519 ]
2520 if sys.platform == "win32":
2521 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002522 (self.bytes_filenames, os.rename, b"dst"),
2523 (self.bytes_filenames, os.replace, b"dst"),
2524 (self.unicode_filenames, os.rename, "dst"),
2525 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002526 # Issue #16414: Don't test undecodable names with listdir()
2527 # because of a Windows bug.
2528 #
2529 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2530 # empty list (instead of failing), whereas os.listdir(b'\xff')
2531 # raises a FileNotFoundError. It looks like a Windows bug:
2532 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2533 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2534 # ERROR_PATH_NOT_FOUND (3).
2535 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002536 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002537 else:
2538 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002539 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002540 (self.filenames, os.rename, "dst"),
2541 (self.filenames, os.replace, "dst"),
2542 ))
2543 if hasattr(os, "chown"):
2544 funcs.append((self.filenames, os.chown, 0, 0))
2545 if hasattr(os, "lchown"):
2546 funcs.append((self.filenames, os.lchown, 0, 0))
2547 if hasattr(os, "truncate"):
2548 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002549 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002550 funcs.append((self.filenames, os.chflags, 0))
2551 if hasattr(os, "lchflags"):
2552 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002553 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002554 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002555 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002556 if sys.platform == "win32":
2557 funcs.append((self.bytes_filenames, os.link, b"dst"))
2558 funcs.append((self.unicode_filenames, os.link, "dst"))
2559 else:
2560 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002561 if hasattr(os, "listxattr"):
2562 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002563 (self.filenames, os.listxattr,),
2564 (self.filenames, os.getxattr, "user.test"),
2565 (self.filenames, os.setxattr, "user.test", b'user'),
2566 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002567 ))
2568 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002569 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002570 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002571 if sys.platform == "win32":
2572 funcs.append((self.unicode_filenames, os.readlink,))
2573 else:
2574 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002575
Victor Stinnerafe17062012-10-31 22:47:43 +01002576 for filenames, func, *func_args in funcs:
2577 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002578 try:
2579 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002580 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002581 self.assertIs(err.filename, name)
2582 else:
2583 self.fail("No exception thrown by {}".format(func))
2584
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002585class CPUCountTests(unittest.TestCase):
2586 def test_cpu_count(self):
2587 cpus = os.cpu_count()
2588 if cpus is not None:
2589 self.assertIsInstance(cpus, int)
2590 self.assertGreater(cpus, 0)
2591 else:
2592 self.skipTest("Could not determine the number of CPUs")
2593
Victor Stinnerdaf45552013-08-28 00:53:59 +02002594
2595class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002596 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002597 fd = os.open(__file__, os.O_RDONLY)
2598 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002599 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002600
Victor Stinnerdaf45552013-08-28 00:53:59 +02002601 os.set_inheritable(fd, True)
2602 self.assertEqual(os.get_inheritable(fd), True)
2603
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002604 @unittest.skipIf(fcntl is None, "need fcntl")
2605 def test_get_inheritable_cloexec(self):
2606 fd = os.open(__file__, os.O_RDONLY)
2607 self.addCleanup(os.close, fd)
2608 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002609
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002610 # clear FD_CLOEXEC flag
2611 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2612 flags &= ~fcntl.FD_CLOEXEC
2613 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002614
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002615 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002616
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002617 @unittest.skipIf(fcntl is None, "need fcntl")
2618 def test_set_inheritable_cloexec(self):
2619 fd = os.open(__file__, os.O_RDONLY)
2620 self.addCleanup(os.close, fd)
2621 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2622 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002623
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002624 os.set_inheritable(fd, True)
2625 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2626 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002627
Victor Stinnerdaf45552013-08-28 00:53:59 +02002628 def test_open(self):
2629 fd = os.open(__file__, os.O_RDONLY)
2630 self.addCleanup(os.close, fd)
2631 self.assertEqual(os.get_inheritable(fd), False)
2632
2633 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2634 def test_pipe(self):
2635 rfd, wfd = os.pipe()
2636 self.addCleanup(os.close, rfd)
2637 self.addCleanup(os.close, wfd)
2638 self.assertEqual(os.get_inheritable(rfd), False)
2639 self.assertEqual(os.get_inheritable(wfd), False)
2640
2641 def test_dup(self):
2642 fd1 = os.open(__file__, os.O_RDONLY)
2643 self.addCleanup(os.close, fd1)
2644
2645 fd2 = os.dup(fd1)
2646 self.addCleanup(os.close, fd2)
2647 self.assertEqual(os.get_inheritable(fd2), False)
2648
2649 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2650 def test_dup2(self):
2651 fd = os.open(__file__, os.O_RDONLY)
2652 self.addCleanup(os.close, fd)
2653
2654 # inheritable by default
2655 fd2 = os.open(__file__, os.O_RDONLY)
2656 try:
2657 os.dup2(fd, fd2)
2658 self.assertEqual(os.get_inheritable(fd2), True)
2659 finally:
2660 os.close(fd2)
2661
2662 # force non-inheritable
2663 fd3 = os.open(__file__, os.O_RDONLY)
2664 try:
2665 os.dup2(fd, fd3, inheritable=False)
2666 self.assertEqual(os.get_inheritable(fd3), False)
2667 finally:
2668 os.close(fd3)
2669
2670 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2671 def test_openpty(self):
2672 master_fd, slave_fd = os.openpty()
2673 self.addCleanup(os.close, master_fd)
2674 self.addCleanup(os.close, slave_fd)
2675 self.assertEqual(os.get_inheritable(master_fd), False)
2676 self.assertEqual(os.get_inheritable(slave_fd), False)
2677
2678
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002679@unittest.skipUnless(hasattr(os, 'get_blocking'),
2680 'needs os.get_blocking() and os.set_blocking()')
2681class BlockingTests(unittest.TestCase):
2682 def test_blocking(self):
2683 fd = os.open(__file__, os.O_RDONLY)
2684 self.addCleanup(os.close, fd)
2685 self.assertEqual(os.get_blocking(fd), True)
2686
2687 os.set_blocking(fd, False)
2688 self.assertEqual(os.get_blocking(fd), False)
2689
2690 os.set_blocking(fd, True)
2691 self.assertEqual(os.get_blocking(fd), True)
2692
2693
Yury Selivanov97e2e062014-09-26 12:33:06 -04002694
2695class ExportsTests(unittest.TestCase):
2696 def test_os_all(self):
2697 self.assertIn('open', os.__all__)
2698 self.assertIn('walk', os.__all__)
2699
2700
Fred Drake2e2be372001-09-20 21:33:42 +00002701if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05002702 unittest.main()