blob: 70734ab7926d801d0969d57c0f2e0d107f9f11b4 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
R David Murrayf2ad1732014-12-25 18:36:56 -05007import getpass
Fred Drake38c2ef02001-07-17 20:52:51 +00008import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00009import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +000010import sys
Brian Curtineb24d742010-04-12 17:16:38 +000011import signal
12import subprocess
13import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000014import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000015from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000016import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000017import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import platform
19import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000020import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import asyncore
22import asynchat
23import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010024import itertools
25import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050026import locale
27import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070028import decimal
29import fractions
Christian Heimes25827622013-10-12 01:27:08 +020030import pickle
Victor Stinner4d6a3d62014-12-21 01:16:38 +010031import sysconfig
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000032try:
33 import threading
34except ImportError:
35 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import grp
50 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
51 if hasattr(os, 'getgid'):
52 process_gid = os.getgid()
53 if process_gid not in groups:
54 groups.append(process_gid)
55except ImportError:
56 groups = []
57try:
58 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
60except ImportError:
61 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Georg Brandl2daf6ae2012-02-20 19:54:16 +010067from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Victor Stinner034d0aa2012-06-05 01:22:15 +020073with warnings.catch_warnings():
74 warnings.simplefilter("ignore", DeprecationWarning)
75 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010076st = os.stat(__file__)
77stat_supports_subsecond = (
78 # check if float and int timestamps are different
79 (st.st_atime != st[7])
80 or (st.st_mtime != st[8])
81 or (st.st_ctime != st[9]))
82
Mark Dickinson7cf03892010-04-16 13:45:35 +000083# Detect whether we're on a Linux system that uses the (now outdated
84# and unmaintained) linuxthreads threading library. There's an issue
85# when combining linuxthreads with a failed execv call: see
86# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020087if hasattr(sys, 'thread_info') and sys.thread_info.version:
88 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
89else:
90 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000091
Stefan Krahebee49a2013-01-17 15:31:00 +010092# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
93HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
94
Thomas Wouters0e3f5912006-08-11 14:57:12 +000095# Tests creating TESTFN
96class FileTests(unittest.TestCase):
97 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000098 if os.path.exists(support.TESTFN):
99 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000100 tearDown = setUp
101
102 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000103 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000104 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000105 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000106
Christian Heimesfdab48e2008-01-20 09:06:41 +0000107 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000108 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
109 # We must allocate two consecutive file descriptors, otherwise
110 # it will mess up other file descriptors (perhaps even the three
111 # standard ones).
112 second = os.dup(first)
113 try:
114 retries = 0
115 while second != first + 1:
116 os.close(first)
117 retries += 1
118 if retries > 10:
119 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000120 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000121 first, second = second, os.dup(second)
122 finally:
123 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000124 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000125 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000126 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000127
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000128 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000129 def test_rename(self):
130 path = support.TESTFN
131 old = sys.getrefcount(path)
132 self.assertRaises(TypeError, os.rename, path, 0)
133 new = sys.getrefcount(path)
134 self.assertEqual(old, new)
135
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000136 def test_read(self):
137 with open(support.TESTFN, "w+b") as fobj:
138 fobj.write(b"spam")
139 fobj.flush()
140 fd = fobj.fileno()
141 os.lseek(fd, 0, 0)
142 s = os.read(fd, 4)
143 self.assertEqual(type(s), bytes)
144 self.assertEqual(s, b"spam")
145
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200146 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200147 # Skip the test on 32-bit platforms: the number of bytes must fit in a
148 # Py_ssize_t type
149 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
150 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200151 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
152 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200153 with open(support.TESTFN, "wb") as fp:
154 fp.write(b'test')
155 self.addCleanup(support.unlink, support.TESTFN)
156
157 # Issue #21932: Make sure that os.read() does not raise an
158 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200159 with open(support.TESTFN, "rb") as fp:
160 data = os.read(fp.fileno(), size)
161
162 # The test does not try to read more than 2 GB at once because the
163 # operating system is free to return less bytes than requested.
164 self.assertEqual(data, b'test')
165
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000166 def test_write(self):
167 # os.write() accepts bytes- and buffer-like objects but not strings
168 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
169 self.assertRaises(TypeError, os.write, fd, "beans")
170 os.write(fd, b"bacon\n")
171 os.write(fd, bytearray(b"eggs\n"))
172 os.write(fd, memoryview(b"spam\n"))
173 os.close(fd)
174 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000175 self.assertEqual(fobj.read().splitlines(),
176 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000177
Victor Stinnere0daff12011-03-20 23:36:35 +0100178 def write_windows_console(self, *args):
179 retcode = subprocess.call(args,
180 # use a new console to not flood the test output
181 creationflags=subprocess.CREATE_NEW_CONSOLE,
182 # use a shell to hide the console window (SW_HIDE)
183 shell=True)
184 self.assertEqual(retcode, 0)
185
186 @unittest.skipUnless(sys.platform == 'win32',
187 'test specific to the Windows console')
188 def test_write_windows_console(self):
189 # Issue #11395: the Windows console returns an error (12: not enough
190 # space error) on writing into stdout if stdout mode is binary and the
191 # length is greater than 66,000 bytes (or less, depending on heap
192 # usage).
193 code = "print('x' * 100000)"
194 self.write_windows_console(sys.executable, "-c", code)
195 self.write_windows_console(sys.executable, "-u", "-c", code)
196
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000197 def fdopen_helper(self, *args):
198 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200199 f = os.fdopen(fd, *args)
200 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000201
202 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200203 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
204 os.close(fd)
205
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000206 self.fdopen_helper()
207 self.fdopen_helper('r')
208 self.fdopen_helper('r', 100)
209
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100210 def test_replace(self):
211 TESTFN2 = support.TESTFN + ".2"
212 with open(support.TESTFN, 'w') as f:
213 f.write("1")
214 with open(TESTFN2, 'w') as f:
215 f.write("2")
216 self.addCleanup(os.unlink, TESTFN2)
217 os.replace(support.TESTFN, TESTFN2)
218 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
219 with open(TESTFN2, 'r') as f:
220 self.assertEqual(f.read(), "1")
221
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200222
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000223# Test attributes on return values from os.*stat* family.
224class StatAttributeTests(unittest.TestCase):
225 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000226 os.mkdir(support.TESTFN)
227 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000228 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000229 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000230 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000231
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000232 def tearDown(self):
233 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000234 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235
Serhiy Storchaka43767632013-11-03 21:31:38 +0200236 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000237 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000238 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000239
240 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000241 self.assertEqual(result[stat.ST_SIZE], 3)
242 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000243
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244 # Make sure all the attributes are there
245 members = dir(result)
246 for name in dir(stat):
247 if name[:3] == 'ST_':
248 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000249 if name.endswith("TIME"):
250 def trunc(x): return int(x)
251 else:
252 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000253 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000256
Larry Hastings6fe20b32012-04-19 15:07:49 -0700257 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700258 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700259 for name in 'st_atime st_mtime st_ctime'.split():
260 floaty = int(getattr(result, name) * 100000)
261 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700262 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700263
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264 try:
265 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200266 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267 except IndexError:
268 pass
269
270 # Make sure that assignment fails
271 try:
272 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200273 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000274 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000275 pass
276
277 try:
278 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200279 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000280 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000281 pass
282
283 try:
284 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200285 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000286 except AttributeError:
287 pass
288
289 # Use the stat_result constructor with a too-short tuple.
290 try:
291 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200292 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000293 except TypeError:
294 pass
295
Ezio Melotti42da6632011-03-15 05:18:48 +0200296 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000297 try:
298 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
299 except TypeError:
300 pass
301
Antoine Pitrou38425292010-09-21 18:19:07 +0000302 def test_stat_attributes(self):
303 self.check_stat_attributes(self.fname)
304
305 def test_stat_attributes_bytes(self):
306 try:
307 fname = self.fname.encode(sys.getfilesystemencoding())
308 except UnicodeEncodeError:
309 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100310 with warnings.catch_warnings():
311 warnings.simplefilter("ignore", DeprecationWarning)
312 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000313
Christian Heimes25827622013-10-12 01:27:08 +0200314 def test_stat_result_pickle(self):
315 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200316 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
317 p = pickle.dumps(result, proto)
318 self.assertIn(b'stat_result', p)
319 if proto < 4:
320 self.assertIn(b'cos\nstat_result\n', p)
321 unpickled = pickle.loads(p)
322 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200323
Serhiy Storchaka43767632013-11-03 21:31:38 +0200324 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000325 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000326 try:
327 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000328 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000329 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000330 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200331 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000332
333 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000334 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000335
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000336 # Make sure all the attributes are there.
337 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
338 'ffree', 'favail', 'flag', 'namemax')
339 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000340 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000341
342 # Make sure that assignment really fails
343 try:
344 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200345 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000346 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000347 pass
348
349 try:
350 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200351 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000352 except AttributeError:
353 pass
354
355 # Use the constructor with a too-short tuple.
356 try:
357 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200358 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000359 except TypeError:
360 pass
361
Ezio Melotti42da6632011-03-15 05:18:48 +0200362 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000363 try:
364 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
365 except TypeError:
366 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000367
Christian Heimes25827622013-10-12 01:27:08 +0200368 @unittest.skipUnless(hasattr(os, 'statvfs'),
369 "need os.statvfs()")
370 def test_statvfs_result_pickle(self):
371 try:
372 result = os.statvfs(self.fname)
373 except OSError as e:
374 # On AtheOS, glibc always returns ENOSYS
375 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200376 self.skipTest('os.statvfs() failed with ENOSYS')
377
Serhiy Storchakabad12572014-12-15 14:03:42 +0200378 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
379 p = pickle.dumps(result, proto)
380 self.assertIn(b'statvfs_result', p)
381 if proto < 4:
382 self.assertIn(b'cos\nstatvfs_result\n', p)
383 unpickled = pickle.loads(p)
384 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200385
Thomas Wouters89f507f2006-12-13 04:49:30 +0000386 def test_utime_dir(self):
387 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000388 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000389 # round to int, because some systems may support sub-second
390 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000391 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
392 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000393 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000394
Larry Hastings76ad59b2012-05-03 00:30:07 -0700395 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600396 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600397 # second argument. Check that the previous methods of passing
398 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700399 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600400 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700401 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
402 # Setting the time to the time you just read, then reading again,
403 # should always return exactly the same times.
404 st1 = os.stat(filename)
405 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
406 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600407 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700408 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600409 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700410 # Set to the current time in the new way
411 os.utime(filename)
412 st3 = os.stat(filename)
413 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
414
415 def test_utime(self):
416 def utime(file, times):
417 return os.utime(file, times)
418 self._test_utime(self.fname, getattr, utime, 10)
419 self._test_utime(support.TESTFN, getattr, utime, 10)
420
421
422 def _test_utime_ns(self, set_times_ns, test_dir=True):
423 def getattr_ns(o, attr):
424 return getattr(o, attr + "_ns")
425 ten_s = 10 * 1000 * 1000 * 1000
426 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
427 if test_dir:
428 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
429
430 def test_utime_ns(self):
431 def utime_ns(file, times):
432 return os.utime(file, ns=times)
433 self._test_utime_ns(utime_ns)
434
Larry Hastings9cf065c2012-06-22 16:30:09 -0700435 requires_utime_dir_fd = unittest.skipUnless(
436 os.utime in os.supports_dir_fd,
437 "dir_fd support for utime required for this test.")
438 requires_utime_fd = unittest.skipUnless(
439 os.utime in os.supports_fd,
440 "fd support for utime required for this test.")
441 requires_utime_nofollow_symlinks = unittest.skipUnless(
442 os.utime in os.supports_follow_symlinks,
443 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700444
Larry Hastings9cf065c2012-06-22 16:30:09 -0700445 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700446 def test_lutimes_ns(self):
447 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700448 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700449 self._test_utime_ns(lutimes_ns)
450
Larry Hastings9cf065c2012-06-22 16:30:09 -0700451 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700452 def test_futimes_ns(self):
453 def futimes_ns(file, times):
454 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700455 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700456 self._test_utime_ns(futimes_ns, test_dir=False)
457
458 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700459 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700460 getattr(os, name)(arg, (5, 5), ns=(5, 5))
461
462 def test_utime_invalid_arguments(self):
463 self._utime_invalid_arguments('utime', self.fname)
464
Brian Curtin52fbea12011-11-06 13:41:17 -0600465
Victor Stinner1aa54a42012-02-08 04:09:37 +0100466 @unittest.skipUnless(stat_supports_subsecond,
467 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100468 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100469 asec, amsec = 1, 901
470 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100471 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100472 mtime = msec + mmsec * 1e-3
473 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100474 os.utime(filename, (0, 0))
475 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200476 with warnings.catch_warnings():
477 warnings.simplefilter("ignore", DeprecationWarning)
478 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100479 st = os.stat(filename)
480 self.assertAlmostEqual(st.st_atime, atime, places=3)
481 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100482
Victor Stinnera2f7c002012-02-08 03:36:25 +0100483 def test_utime_subsecond(self):
484 def set_time(filename, atime, mtime):
485 os.utime(filename, (atime, mtime))
486 self._test_utime_subsecond(set_time)
487
Larry Hastings9cf065c2012-06-22 16:30:09 -0700488 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100489 def test_futimes_subsecond(self):
490 def set_time(filename, atime, mtime):
491 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700492 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100493 self._test_utime_subsecond(set_time)
494
Larry Hastings9cf065c2012-06-22 16:30:09 -0700495 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100496 def test_futimens_subsecond(self):
497 def set_time(filename, atime, mtime):
498 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700499 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100500 self._test_utime_subsecond(set_time)
501
Larry Hastings9cf065c2012-06-22 16:30:09 -0700502 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100503 def test_futimesat_subsecond(self):
504 def set_time(filename, atime, mtime):
505 dirname = os.path.dirname(filename)
506 dirfd = os.open(dirname, os.O_RDONLY)
507 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700508 os.utime(os.path.basename(filename), dir_fd=dirfd,
509 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100510 finally:
511 os.close(dirfd)
512 self._test_utime_subsecond(set_time)
513
Larry Hastings9cf065c2012-06-22 16:30:09 -0700514 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100515 def test_lutimes_subsecond(self):
516 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700517 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100518 self._test_utime_subsecond(set_time)
519
Larry Hastings9cf065c2012-06-22 16:30:09 -0700520 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100521 def test_utimensat_subsecond(self):
522 def set_time(filename, atime, mtime):
523 dirname = os.path.dirname(filename)
524 dirfd = os.open(dirname, os.O_RDONLY)
525 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700526 os.utime(os.path.basename(filename), dir_fd=dirfd,
527 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100528 finally:
529 os.close(dirfd)
530 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100531
Serhiy Storchaka43767632013-11-03 21:31:38 +0200532 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000533 # systems support centiseconds
Serhiy Storchaka43767632013-11-03 21:31:38 +0200534 def get_file_system(path):
535 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000536 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000537 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000538 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000539 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000540 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000541 return buf.value
542
Serhiy Storchaka43767632013-11-03 21:31:38 +0200543 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
544 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
545 "requires NTFS")
546 def test_1565150(self):
547 t1 = 1159195039.25
548 os.utime(self.fname, (t1, t1))
549 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000550
Serhiy Storchaka43767632013-11-03 21:31:38 +0200551 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
552 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
553 "requires NTFS")
554 def test_large_time(self):
555 t1 = 5000000000 # some day in 2128
556 os.utime(self.fname, (t1, t1))
557 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000558
Serhiy Storchaka43767632013-11-03 21:31:38 +0200559 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
560 def test_1686475(self):
561 # Verify that an open file can be stat'ed
562 try:
563 os.stat(r"c:\pagefile.sys")
564 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600565 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200566 except OSError as e:
567 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000568
Serhiy Storchaka43767632013-11-03 21:31:38 +0200569 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
570 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
571 def test_15261(self):
572 # Verify that stat'ing a closed fd does not cause crash
573 r, w = os.pipe()
574 try:
575 os.stat(r) # should not raise error
576 finally:
577 os.close(r)
578 os.close(w)
579 with self.assertRaises(OSError) as ctx:
580 os.stat(r)
581 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100582
Zachary Ware63f277b2014-06-19 09:46:37 -0500583 def check_file_attributes(self, result):
584 self.assertTrue(hasattr(result, 'st_file_attributes'))
585 self.assertTrue(isinstance(result.st_file_attributes, int))
586 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
587
588 @unittest.skipUnless(sys.platform == "win32",
589 "st_file_attributes is Win32 specific")
590 def test_file_attributes(self):
591 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
592 result = os.stat(self.fname)
593 self.check_file_attributes(result)
594 self.assertEqual(
595 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
596 0)
597
598 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
599 result = os.stat(support.TESTFN)
600 self.check_file_attributes(result)
601 self.assertEqual(
602 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
603 stat.FILE_ATTRIBUTE_DIRECTORY)
604
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000605from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000606
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000607class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000608 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000609 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000610
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000611 def setUp(self):
612 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000613 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000614 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000615 for key, value in self._reference().items():
616 os.environ[key] = value
617
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000618 def tearDown(self):
619 os.environ.clear()
620 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000621 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000622 os.environb.clear()
623 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000624
Christian Heimes90333392007-11-01 19:08:42 +0000625 def _reference(self):
626 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
627
628 def _empty_mapping(self):
629 os.environ.clear()
630 return os.environ
631
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000632 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300633 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000634 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000635 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300636 os.environ.update(HELLO="World")
637 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
638 value = popen.read().strip()
639 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000640
Ezio Melottic7e139b2012-09-26 20:01:34 +0300641 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000642 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300643 with os.popen(
644 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
645 it = iter(popen)
646 self.assertEqual(next(it), "line1\n")
647 self.assertEqual(next(it), "line2\n")
648 self.assertEqual(next(it), "line3\n")
649 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000650
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000651 # Verify environ keys and values from the OS are of the
652 # correct str type.
653 def test_keyvalue_types(self):
654 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000655 self.assertEqual(type(key), str)
656 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000657
Christian Heimes90333392007-11-01 19:08:42 +0000658 def test_items(self):
659 for key, value in self._reference().items():
660 self.assertEqual(os.environ.get(key), value)
661
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000662 # Issue 7310
663 def test___repr__(self):
664 """Check that the repr() of os.environ looks like environ({...})."""
665 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000666 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
667 '{!r}: {!r}'.format(key, value)
668 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000669
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000670 def test_get_exec_path(self):
671 defpath_list = os.defpath.split(os.pathsep)
672 test_path = ['/monty', '/python', '', '/flying/circus']
673 test_env = {'PATH': os.pathsep.join(test_path)}
674
675 saved_environ = os.environ
676 try:
677 os.environ = dict(test_env)
678 # Test that defaulting to os.environ works.
679 self.assertSequenceEqual(test_path, os.get_exec_path())
680 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
681 finally:
682 os.environ = saved_environ
683
684 # No PATH environment variable
685 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
686 # Empty PATH environment variable
687 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
688 # Supplied PATH environment variable
689 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
690
Victor Stinnerb745a742010-05-18 17:17:23 +0000691 if os.supports_bytes_environ:
692 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000693 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000694 # ignore BytesWarning warning
695 with warnings.catch_warnings(record=True):
696 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000697 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000698 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000699 pass
700 else:
701 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000702
703 # bytes key and/or value
704 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
705 ['abc'])
706 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
707 ['abc'])
708 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
709 ['abc'])
710
711 @unittest.skipUnless(os.supports_bytes_environ,
712 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000713 def test_environb(self):
714 # os.environ -> os.environb
715 value = 'euro\u20ac'
716 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000717 value_bytes = value.encode(sys.getfilesystemencoding(),
718 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000719 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000720 msg = "U+20AC character is not encodable to %s" % (
721 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000722 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000723 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000724 self.assertEqual(os.environ['unicode'], value)
725 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000726
727 # os.environb -> os.environ
728 value = b'\xff'
729 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000730 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000731 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000732 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000733
Charles-François Natali2966f102011-11-26 11:32:46 +0100734 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
735 # #13415).
736 @support.requires_freebsd_version(7)
737 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100738 def test_unset_error(self):
739 if sys.platform == "win32":
740 # an environment variable is limited to 32,767 characters
741 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100742 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100743 else:
744 # "=" is not allowed in a variable name
745 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100746 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100747
Victor Stinner6d101392013-04-14 16:35:04 +0200748 def test_key_type(self):
749 missing = 'missingkey'
750 self.assertNotIn(missing, os.environ)
751
Victor Stinner839e5ea2013-04-14 16:43:03 +0200752 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200753 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200754 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200755 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200756
Victor Stinner839e5ea2013-04-14 16:43:03 +0200757 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200758 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200759 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200760 self.assertTrue(cm.exception.__suppress_context__)
761
Victor Stinner6d101392013-04-14 16:35:04 +0200762
Tim Petersc4e09402003-04-25 07:11:48 +0000763class WalkTests(unittest.TestCase):
764 """Tests for os.walk()."""
765
Victor Stinner0561c532015-03-12 10:28:24 +0100766 # Wrapper to hide minor differences between os.walk and os.fwalk
767 # to tests both functions with the same code base
768 def walk(self, directory, topdown=True, follow_symlinks=False):
769 walk_it = os.walk(directory,
770 topdown=topdown,
771 followlinks=follow_symlinks)
772 for root, dirs, files in walk_it:
773 yield (root, dirs, files)
774
Charles-François Natali7372b062012-02-05 15:15:38 +0100775 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100776 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000777
778 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000779 # TESTFN/
780 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000781 # tmp1
782 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000783 # tmp2
784 # SUB11/ no kids
785 # SUB2/ a file kid and a dirsymlink kid
786 # tmp3
787 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200788 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000789 # TEST2/
790 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100791 self.walk_path = join(support.TESTFN, "TEST1")
792 self.sub1_path = join(self.walk_path, "SUB1")
793 self.sub11_path = join(self.sub1_path, "SUB11")
794 sub2_path = join(self.walk_path, "SUB2")
795 tmp1_path = join(self.walk_path, "tmp1")
796 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000797 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100798 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000799 t2_path = join(support.TESTFN, "TEST2")
800 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200801 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000802
803 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100804 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000805 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000806 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100807
Guido van Rossumd8faa362007-04-27 19:54:29 +0000808 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000809 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000810 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
811 f.close()
812
Victor Stinner0561c532015-03-12 10:28:24 +0100813 if support.can_symlink():
814 os.symlink(os.path.abspath(t2_path), self.link_path)
815 os.symlink('broken', broken_link_path, True)
816 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
817 else:
818 self.sub2_tree = (sub2_path, [], ["tmp3"])
819
820 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000821 # Walk top-down.
Victor Stinner0561c532015-03-12 10:28:24 +0100822 all = list(os.walk(self.walk_path))
823
Tim Petersc4e09402003-04-25 07:11:48 +0000824 self.assertEqual(len(all), 4)
825 # We can't know which order SUB1 and SUB2 will appear in.
826 # Not flipped: TESTFN, SUB1, SUB11, SUB2
827 # flipped: TESTFN, SUB2, SUB1, SUB11
828 flipped = all[0][1][0] != "SUB1"
829 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200830 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100831 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
832 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
833 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
834 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000835
Victor Stinner0561c532015-03-12 10:28:24 +0100836 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000837 # Prune the search.
838 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100839 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000840 all.append((root, dirs, files))
841 # Don't descend into SUB1.
842 if 'SUB1' in dirs:
843 # Note that this also mutates the dirs we appended to all!
844 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000845
Victor Stinner0561c532015-03-12 10:28:24 +0100846 self.assertEqual(len(all), 2)
847 self.assertEqual(all[0],
848 (self.walk_path, ["SUB2"], ["tmp1"]))
849
850 all[1][-1].sort()
851 self.assertEqual(all[1], self.sub2_tree)
852
853 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000854 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100855 all = list(self.walk(self.walk_path, topdown=False))
856
Tim Petersc4e09402003-04-25 07:11:48 +0000857 self.assertEqual(len(all), 4)
858 # We can't know which order SUB1 and SUB2 will appear in.
859 # Not flipped: SUB11, SUB1, SUB2, TESTFN
860 # flipped: SUB2, SUB11, SUB1, TESTFN
861 flipped = all[3][1][0] != "SUB1"
862 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200863 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100864 self.assertEqual(all[3],
865 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
866 self.assertEqual(all[flipped],
867 (self.sub11_path, [], []))
868 self.assertEqual(all[flipped + 1],
869 (self.sub1_path, ["SUB11"], ["tmp2"]))
870 self.assertEqual(all[2 - 2 * flipped],
871 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000872
Victor Stinner0561c532015-03-12 10:28:24 +0100873 def test_walk_symlink(self):
874 if not support.can_symlink():
875 self.skipTest("need symlink support")
876
877 # Walk, following symlinks.
878 walk_it = self.walk(self.walk_path, follow_symlinks=True)
879 for root, dirs, files in walk_it:
880 if root == self.link_path:
881 self.assertEqual(dirs, [])
882 self.assertEqual(files, ["tmp4"])
883 break
884 else:
885 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000886
887 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000888 # Tear everything down. This is a decent use for bottom-up on
889 # Windows, which doesn't have a recursive delete command. The
890 # (not so) subtlety is that rmdir will fail unless the dir's
891 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000892 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000893 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000894 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000895 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000896 dirname = os.path.join(root, name)
897 if not os.path.islink(dirname):
898 os.rmdir(dirname)
899 else:
900 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000901 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000902
Charles-François Natali7372b062012-02-05 15:15:38 +0100903
904@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
905class FwalkTests(WalkTests):
906 """Tests for os.fwalk()."""
907
Victor Stinner0561c532015-03-12 10:28:24 +0100908 def walk(self, directory, topdown=True, follow_symlinks=False):
909 walk_it = os.fwalk(directory,
910 topdown=topdown,
911 follow_symlinks=follow_symlinks)
912 for root, dirs, files, root_fd in walk_it:
913 yield (root, dirs, files)
914
915
Larry Hastingsc48fe982012-06-25 04:49:05 -0700916 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
917 """
918 compare with walk() results.
919 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700920 walk_kwargs = walk_kwargs.copy()
921 fwalk_kwargs = fwalk_kwargs.copy()
922 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
923 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
924 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700925
Charles-François Natali7372b062012-02-05 15:15:38 +0100926 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700927 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100928 expected[root] = (set(dirs), set(files))
929
Larry Hastingsc48fe982012-06-25 04:49:05 -0700930 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100931 self.assertIn(root, expected)
932 self.assertEqual(expected[root], (set(dirs), set(files)))
933
Larry Hastingsc48fe982012-06-25 04:49:05 -0700934 def test_compare_to_walk(self):
935 kwargs = {'top': support.TESTFN}
936 self._compare_to_walk(kwargs, kwargs)
937
Charles-François Natali7372b062012-02-05 15:15:38 +0100938 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700939 try:
940 fd = os.open(".", os.O_RDONLY)
941 walk_kwargs = {'top': support.TESTFN}
942 fwalk_kwargs = walk_kwargs.copy()
943 fwalk_kwargs['dir_fd'] = fd
944 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
945 finally:
946 os.close(fd)
947
948 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100949 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700950 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
951 args = support.TESTFN, topdown, None
952 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100953 # check that the FD is valid
954 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700955 # redundant check
956 os.stat(rootfd)
957 # check that listdir() returns consistent information
958 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100959
960 def test_fd_leak(self):
961 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
962 # we both check that calling fwalk() a large number of times doesn't
963 # yield EMFILE, and that the minimum allocated FD hasn't changed.
964 minfd = os.dup(1)
965 os.close(minfd)
966 for i in range(256):
967 for x in os.fwalk(support.TESTFN):
968 pass
969 newfd = os.dup(1)
970 self.addCleanup(os.close, newfd)
971 self.assertEqual(newfd, minfd)
972
973 def tearDown(self):
974 # cleanup
975 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
976 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700977 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100978 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700979 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700980 if stat.S_ISDIR(st.st_mode):
981 os.rmdir(name, dir_fd=rootfd)
982 else:
983 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100984 os.rmdir(support.TESTFN)
985
986
Guido van Rossume7ba4952007-06-06 23:52:48 +0000987class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000988 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000989 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000990
991 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000992 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000993 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
994 os.makedirs(path) # Should work
995 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
996 os.makedirs(path)
997
998 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000999 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001000 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1001 os.makedirs(path)
1002 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1003 'dir5', 'dir6')
1004 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001005
Terry Reedy5a22b652010-12-02 07:05:56 +00001006 def test_exist_ok_existing_directory(self):
1007 path = os.path.join(support.TESTFN, 'dir1')
1008 mode = 0o777
1009 old_mask = os.umask(0o022)
1010 os.makedirs(path, mode)
1011 self.assertRaises(OSError, os.makedirs, path, mode)
1012 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001013 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001014 os.makedirs(path, mode=mode, exist_ok=True)
1015 os.umask(old_mask)
1016
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001017 def test_exist_ok_s_isgid_directory(self):
1018 path = os.path.join(support.TESTFN, 'dir1')
1019 S_ISGID = stat.S_ISGID
1020 mode = 0o777
1021 old_mask = os.umask(0o022)
1022 try:
1023 existing_testfn_mode = stat.S_IMODE(
1024 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001025 try:
1026 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001027 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001028 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001029 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1030 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1031 # The os should apply S_ISGID from the parent dir for us, but
1032 # this test need not depend on that behavior. Be explicit.
1033 os.makedirs(path, mode | S_ISGID)
1034 # http://bugs.python.org/issue14992
1035 # Should not fail when the bit is already set.
1036 os.makedirs(path, mode, exist_ok=True)
1037 # remove the bit.
1038 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001039 # May work even when the bit is not already set when demanded.
1040 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001041 finally:
1042 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001043
1044 def test_exist_ok_existing_regular_file(self):
1045 base = support.TESTFN
1046 path = os.path.join(support.TESTFN, 'dir1')
1047 f = open(path, 'w')
1048 f.write('abc')
1049 f.close()
1050 self.assertRaises(OSError, os.makedirs, path)
1051 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1052 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1053 os.remove(path)
1054
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001055 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001056 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001057 'dir4', 'dir5', 'dir6')
1058 # If the tests failed, the bottom-most directory ('../dir6')
1059 # may not have been created, so we look for the outermost directory
1060 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001061 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001062 path = os.path.dirname(path)
1063
1064 os.removedirs(path)
1065
Andrew Svetlov405faed2012-12-25 12:18:09 +02001066
R David Murrayf2ad1732014-12-25 18:36:56 -05001067@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1068class ChownFileTests(unittest.TestCase):
1069
1070 def setUpClass():
1071 os.mkdir(support.TESTFN)
1072
1073 def test_chown_uid_gid_arguments_must_be_index(self):
1074 stat = os.stat(support.TESTFN)
1075 uid = stat.st_uid
1076 gid = stat.st_gid
1077 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1078 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1079 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1080 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1081 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1082
1083 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1084 def test_chown(self):
1085 gid_1, gid_2 = groups[:2]
1086 uid = os.stat(support.TESTFN).st_uid
1087 os.chown(support.TESTFN, uid, gid_1)
1088 gid = os.stat(support.TESTFN).st_gid
1089 self.assertEqual(gid, gid_1)
1090 os.chown(support.TESTFN, uid, gid_2)
1091 gid = os.stat(support.TESTFN).st_gid
1092 self.assertEqual(gid, gid_2)
1093
1094 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1095 "test needs root privilege and more than one user")
1096 def test_chown_with_root(self):
1097 uid_1, uid_2 = all_users[:2]
1098 gid = os.stat(support.TESTFN).st_gid
1099 os.chown(support.TESTFN, uid_1, gid)
1100 uid = os.stat(support.TESTFN).st_uid
1101 self.assertEqual(uid, uid_1)
1102 os.chown(support.TESTFN, uid_2, gid)
1103 uid = os.stat(support.TESTFN).st_uid
1104 self.assertEqual(uid, uid_2)
1105
1106 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1107 "test needs non-root account and more than one user")
1108 def test_chown_without_permission(self):
1109 uid_1, uid_2 = all_users[:2]
1110 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001111 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001112 os.chown(support.TESTFN, uid_1, gid)
1113 os.chown(support.TESTFN, uid_2, gid)
1114
1115 def tearDownClass():
1116 os.rmdir(support.TESTFN)
1117
1118
Andrew Svetlov405faed2012-12-25 12:18:09 +02001119class RemoveDirsTests(unittest.TestCase):
1120 def setUp(self):
1121 os.makedirs(support.TESTFN)
1122
1123 def tearDown(self):
1124 support.rmtree(support.TESTFN)
1125
1126 def test_remove_all(self):
1127 dira = os.path.join(support.TESTFN, 'dira')
1128 os.mkdir(dira)
1129 dirb = os.path.join(dira, 'dirb')
1130 os.mkdir(dirb)
1131 os.removedirs(dirb)
1132 self.assertFalse(os.path.exists(dirb))
1133 self.assertFalse(os.path.exists(dira))
1134 self.assertFalse(os.path.exists(support.TESTFN))
1135
1136 def test_remove_partial(self):
1137 dira = os.path.join(support.TESTFN, 'dira')
1138 os.mkdir(dira)
1139 dirb = os.path.join(dira, 'dirb')
1140 os.mkdir(dirb)
1141 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1142 f.write('text')
1143 os.removedirs(dirb)
1144 self.assertFalse(os.path.exists(dirb))
1145 self.assertTrue(os.path.exists(dira))
1146 self.assertTrue(os.path.exists(support.TESTFN))
1147
1148 def test_remove_nothing(self):
1149 dira = os.path.join(support.TESTFN, 'dira')
1150 os.mkdir(dira)
1151 dirb = os.path.join(dira, 'dirb')
1152 os.mkdir(dirb)
1153 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1154 f.write('text')
1155 with self.assertRaises(OSError):
1156 os.removedirs(dirb)
1157 self.assertTrue(os.path.exists(dirb))
1158 self.assertTrue(os.path.exists(dira))
1159 self.assertTrue(os.path.exists(support.TESTFN))
1160
1161
Guido van Rossume7ba4952007-06-06 23:52:48 +00001162class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001163 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001164 with open(os.devnull, 'wb') as f:
1165 f.write(b'hello')
1166 f.close()
1167 with open(os.devnull, 'rb') as f:
1168 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001169
Andrew Svetlov405faed2012-12-25 12:18:09 +02001170
Guido van Rossume7ba4952007-06-06 23:52:48 +00001171class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001172 def test_urandom_length(self):
1173 self.assertEqual(len(os.urandom(0)), 0)
1174 self.assertEqual(len(os.urandom(1)), 1)
1175 self.assertEqual(len(os.urandom(10)), 10)
1176 self.assertEqual(len(os.urandom(100)), 100)
1177 self.assertEqual(len(os.urandom(1000)), 1000)
1178
1179 def test_urandom_value(self):
1180 data1 = os.urandom(16)
1181 data2 = os.urandom(16)
1182 self.assertNotEqual(data1, data2)
1183
1184 def get_urandom_subprocess(self, count):
1185 code = '\n'.join((
1186 'import os, sys',
1187 'data = os.urandom(%s)' % count,
1188 'sys.stdout.buffer.write(data)',
1189 'sys.stdout.buffer.flush()'))
1190 out = assert_python_ok('-c', code)
1191 stdout = out[1]
1192 self.assertEqual(len(stdout), 16)
1193 return stdout
1194
1195 def test_urandom_subprocess(self):
1196 data1 = self.get_urandom_subprocess(16)
1197 data2 = self.get_urandom_subprocess(16)
1198 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001199
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001200
1201HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
Victor Stinner9eb57c52015-03-19 22:21:49 +01001202HAVE_GETRANDOM = (sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001203
1204@unittest.skipIf(HAVE_GETENTROPY,
1205 "getentropy() does not use a file descriptor")
Victor Stinner9eb57c52015-03-19 22:21:49 +01001206@unittest.skipIf(HAVE_GETRANDOM,
1207 "getrandom() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001208class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001209 @unittest.skipUnless(resource, "test requires the resource module")
1210 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001211 # Check urandom() failing when it is not able to open /dev/random.
1212 # We spawn a new process to make the test more robust (if getrlimit()
1213 # failed to restore the file descriptor limit after this, the whole
1214 # test suite would crash; this actually happened on the OS X Tiger
1215 # buildbot).
1216 code = """if 1:
1217 import errno
1218 import os
1219 import resource
1220
1221 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1222 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1223 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001224 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001225 except OSError as e:
1226 assert e.errno == errno.EMFILE, e.errno
1227 else:
1228 raise AssertionError("OSError not raised")
1229 """
1230 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001231
Antoine Pitroue472aea2014-04-26 14:33:03 +02001232 def test_urandom_fd_closed(self):
1233 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1234 # closed.
1235 code = """if 1:
1236 import os
1237 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001238 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001239 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001240 with test.support.SuppressCrashReport():
1241 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001242 sys.stdout.buffer.write(os.urandom(4))
1243 """
1244 rc, out, err = assert_python_ok('-Sc', code)
1245
1246 def test_urandom_fd_reopened(self):
1247 # Issue #21207: urandom() should detect its fd to /dev/urandom
1248 # changed to something else, and reopen it.
1249 with open(support.TESTFN, 'wb') as f:
1250 f.write(b"x" * 256)
1251 self.addCleanup(os.unlink, support.TESTFN)
1252 code = """if 1:
1253 import os
1254 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001255 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001256 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001257 with test.support.SuppressCrashReport():
1258 for fd in range(3, 256):
1259 try:
1260 os.close(fd)
1261 except OSError:
1262 pass
1263 else:
1264 # Found the urandom fd (XXX hopefully)
1265 break
1266 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001267 with open({TESTFN!r}, 'rb') as f:
1268 os.dup2(f.fileno(), fd)
1269 sys.stdout.buffer.write(os.urandom(4))
1270 sys.stdout.buffer.write(os.urandom(4))
1271 """.format(TESTFN=support.TESTFN)
1272 rc, out, err = assert_python_ok('-Sc', code)
1273 self.assertEqual(len(out), 8)
1274 self.assertNotEqual(out[0:4], out[4:8])
1275 rc, out2, err2 = assert_python_ok('-Sc', code)
1276 self.assertEqual(len(out2), 8)
1277 self.assertNotEqual(out2, out)
1278
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001279
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001280@contextlib.contextmanager
1281def _execvpe_mockup(defpath=None):
1282 """
1283 Stubs out execv and execve functions when used as context manager.
1284 Records exec calls. The mock execv and execve functions always raise an
1285 exception as they would normally never return.
1286 """
1287 # A list of tuples containing (function name, first arg, args)
1288 # of calls to execv or execve that have been made.
1289 calls = []
1290
1291 def mock_execv(name, *args):
1292 calls.append(('execv', name, args))
1293 raise RuntimeError("execv called")
1294
1295 def mock_execve(name, *args):
1296 calls.append(('execve', name, args))
1297 raise OSError(errno.ENOTDIR, "execve called")
1298
1299 try:
1300 orig_execv = os.execv
1301 orig_execve = os.execve
1302 orig_defpath = os.defpath
1303 os.execv = mock_execv
1304 os.execve = mock_execve
1305 if defpath is not None:
1306 os.defpath = defpath
1307 yield calls
1308 finally:
1309 os.execv = orig_execv
1310 os.execve = orig_execve
1311 os.defpath = orig_defpath
1312
Guido van Rossume7ba4952007-06-06 23:52:48 +00001313class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001314 @unittest.skipIf(USING_LINUXTHREADS,
1315 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001316 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001317 self.assertRaises(OSError, os.execvpe, 'no such app-',
1318 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001319
Thomas Heller6790d602007-08-30 17:15:14 +00001320 def test_execvpe_with_bad_arglist(self):
1321 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1322
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001323 @unittest.skipUnless(hasattr(os, '_execvpe'),
1324 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001325 def _test_internal_execvpe(self, test_type):
1326 program_path = os.sep + 'absolutepath'
1327 if test_type is bytes:
1328 program = b'executable'
1329 fullpath = os.path.join(os.fsencode(program_path), program)
1330 native_fullpath = fullpath
1331 arguments = [b'progname', 'arg1', 'arg2']
1332 else:
1333 program = 'executable'
1334 arguments = ['progname', 'arg1', 'arg2']
1335 fullpath = os.path.join(program_path, program)
1336 if os.name != "nt":
1337 native_fullpath = os.fsencode(fullpath)
1338 else:
1339 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001340 env = {'spam': 'beans'}
1341
Victor Stinnerb745a742010-05-18 17:17:23 +00001342 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001343 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001344 self.assertRaises(RuntimeError,
1345 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001346 self.assertEqual(len(calls), 1)
1347 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1348
Victor Stinnerb745a742010-05-18 17:17:23 +00001349 # test os._execvpe() with a relative path:
1350 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001351 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001352 self.assertRaises(OSError,
1353 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001354 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001355 self.assertSequenceEqual(calls[0],
1356 ('execve', native_fullpath, (arguments, env)))
1357
1358 # test os._execvpe() with a relative path:
1359 # os.get_exec_path() reads the 'PATH' variable
1360 with _execvpe_mockup() as calls:
1361 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001362 if test_type is bytes:
1363 env_path[b'PATH'] = program_path
1364 else:
1365 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001366 self.assertRaises(OSError,
1367 os._execvpe, program, arguments, env=env_path)
1368 self.assertEqual(len(calls), 1)
1369 self.assertSequenceEqual(calls[0],
1370 ('execve', native_fullpath, (arguments, env_path)))
1371
1372 def test_internal_execvpe_str(self):
1373 self._test_internal_execvpe(str)
1374 if os.name != "nt":
1375 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001376
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001377
Serhiy Storchaka43767632013-11-03 21:31:38 +02001378@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001379class Win32ErrorTests(unittest.TestCase):
1380 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001381 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001382
1383 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001384 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001385
1386 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001387 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001388
1389 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001390 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001391 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001392 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001393 finally:
1394 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001395 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001396
1397 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001398 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001399
Thomas Wouters477c8d52006-05-27 19:21:47 +00001400 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001401 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001402
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001403class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001404 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001405 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1406 #singles.append("close")
1407 #We omit close because it doesn'r raise an exception on some platforms
1408 def get_single(f):
1409 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001410 if hasattr(os, f):
1411 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001412 return helper
1413 for f in singles:
1414 locals()["test_"+f] = get_single(f)
1415
Benjamin Peterson7522c742009-01-19 21:00:09 +00001416 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001417 try:
1418 f(support.make_bad_fd(), *args)
1419 except OSError as e:
1420 self.assertEqual(e.errno, errno.EBADF)
1421 else:
1422 self.fail("%r didn't raise a OSError with a bad file descriptor"
1423 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001424
Serhiy Storchaka43767632013-11-03 21:31:38 +02001425 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001426 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001427 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001428
Serhiy Storchaka43767632013-11-03 21:31:38 +02001429 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001430 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001431 fd = support.make_bad_fd()
1432 # Make sure none of the descriptors we are about to close are
1433 # currently valid (issue 6542).
1434 for i in range(10):
1435 try: os.fstat(fd+i)
1436 except OSError:
1437 pass
1438 else:
1439 break
1440 if i < 2:
1441 raise unittest.SkipTest(
1442 "Unable to acquire a range of invalid file descriptors")
1443 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001444
Serhiy Storchaka43767632013-11-03 21:31:38 +02001445 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001446 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001447 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001448
Serhiy Storchaka43767632013-11-03 21:31:38 +02001449 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001450 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001451 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001452
Serhiy Storchaka43767632013-11-03 21:31:38 +02001453 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001454 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001455 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001456
Serhiy Storchaka43767632013-11-03 21:31:38 +02001457 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001458 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001459 self.check(os.pathconf, "PC_NAME_MAX")
1460 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001461
Serhiy Storchaka43767632013-11-03 21:31:38 +02001462 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001463 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001464 self.check(os.truncate, 0)
1465 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001466
Serhiy Storchaka43767632013-11-03 21:31:38 +02001467 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001468 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001469 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001470
Serhiy Storchaka43767632013-11-03 21:31:38 +02001471 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001472 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001473 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001474
Victor Stinner57ddf782014-01-08 15:21:28 +01001475 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1476 def test_readv(self):
1477 buf = bytearray(10)
1478 self.check(os.readv, [buf])
1479
Serhiy Storchaka43767632013-11-03 21:31:38 +02001480 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001481 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001482 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001483
Serhiy Storchaka43767632013-11-03 21:31:38 +02001484 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001485 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001486 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001487
Victor Stinner57ddf782014-01-08 15:21:28 +01001488 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1489 def test_writev(self):
1490 self.check(os.writev, [b'abc'])
1491
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001492 def test_inheritable(self):
1493 self.check(os.get_inheritable)
1494 self.check(os.set_inheritable, True)
1495
1496 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1497 'needs os.get_blocking() and os.set_blocking()')
1498 def test_blocking(self):
1499 self.check(os.get_blocking)
1500 self.check(os.set_blocking, True)
1501
Brian Curtin1b9df392010-11-24 20:24:31 +00001502
1503class LinkTests(unittest.TestCase):
1504 def setUp(self):
1505 self.file1 = support.TESTFN
1506 self.file2 = os.path.join(support.TESTFN + "2")
1507
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001508 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001509 for file in (self.file1, self.file2):
1510 if os.path.exists(file):
1511 os.unlink(file)
1512
Brian Curtin1b9df392010-11-24 20:24:31 +00001513 def _test_link(self, file1, file2):
1514 with open(file1, "w") as f1:
1515 f1.write("test")
1516
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001517 with warnings.catch_warnings():
1518 warnings.simplefilter("ignore", DeprecationWarning)
1519 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001520 with open(file1, "r") as f1, open(file2, "r") as f2:
1521 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1522
1523 def test_link(self):
1524 self._test_link(self.file1, self.file2)
1525
1526 def test_link_bytes(self):
1527 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1528 bytes(self.file2, sys.getfilesystemencoding()))
1529
Brian Curtinf498b752010-11-30 15:54:04 +00001530 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001531 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001532 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001533 except UnicodeError:
1534 raise unittest.SkipTest("Unable to encode for this platform.")
1535
Brian Curtinf498b752010-11-30 15:54:04 +00001536 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001537 self.file2 = self.file1 + "2"
1538 self._test_link(self.file1, self.file2)
1539
Serhiy Storchaka43767632013-11-03 21:31:38 +02001540@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1541class PosixUidGidTests(unittest.TestCase):
1542 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1543 def test_setuid(self):
1544 if os.getuid() != 0:
1545 self.assertRaises(OSError, os.setuid, 0)
1546 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001547
Serhiy Storchaka43767632013-11-03 21:31:38 +02001548 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1549 def test_setgid(self):
1550 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1551 self.assertRaises(OSError, os.setgid, 0)
1552 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001553
Serhiy Storchaka43767632013-11-03 21:31:38 +02001554 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1555 def test_seteuid(self):
1556 if os.getuid() != 0:
1557 self.assertRaises(OSError, os.seteuid, 0)
1558 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001559
Serhiy Storchaka43767632013-11-03 21:31:38 +02001560 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1561 def test_setegid(self):
1562 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1563 self.assertRaises(OSError, os.setegid, 0)
1564 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001565
Serhiy Storchaka43767632013-11-03 21:31:38 +02001566 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1567 def test_setreuid(self):
1568 if os.getuid() != 0:
1569 self.assertRaises(OSError, os.setreuid, 0, 0)
1570 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1571 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001572
Serhiy Storchaka43767632013-11-03 21:31:38 +02001573 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1574 def test_setreuid_neg1(self):
1575 # Needs to accept -1. We run this in a subprocess to avoid
1576 # altering the test runner's process state (issue8045).
1577 subprocess.check_call([
1578 sys.executable, '-c',
1579 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001580
Serhiy Storchaka43767632013-11-03 21:31:38 +02001581 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1582 def test_setregid(self):
1583 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1584 self.assertRaises(OSError, os.setregid, 0, 0)
1585 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1586 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001587
Serhiy Storchaka43767632013-11-03 21:31:38 +02001588 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1589 def test_setregid_neg1(self):
1590 # Needs to accept -1. We run this in a subprocess to avoid
1591 # altering the test runner's process state (issue8045).
1592 subprocess.check_call([
1593 sys.executable, '-c',
1594 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001595
Serhiy Storchaka43767632013-11-03 21:31:38 +02001596@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1597class Pep383Tests(unittest.TestCase):
1598 def setUp(self):
1599 if support.TESTFN_UNENCODABLE:
1600 self.dir = support.TESTFN_UNENCODABLE
1601 elif support.TESTFN_NONASCII:
1602 self.dir = support.TESTFN_NONASCII
1603 else:
1604 self.dir = support.TESTFN
1605 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001606
Serhiy Storchaka43767632013-11-03 21:31:38 +02001607 bytesfn = []
1608 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001609 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001610 fn = os.fsencode(fn)
1611 except UnicodeEncodeError:
1612 return
1613 bytesfn.append(fn)
1614 add_filename(support.TESTFN_UNICODE)
1615 if support.TESTFN_UNENCODABLE:
1616 add_filename(support.TESTFN_UNENCODABLE)
1617 if support.TESTFN_NONASCII:
1618 add_filename(support.TESTFN_NONASCII)
1619 if not bytesfn:
1620 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001621
Serhiy Storchaka43767632013-11-03 21:31:38 +02001622 self.unicodefn = set()
1623 os.mkdir(self.dir)
1624 try:
1625 for fn in bytesfn:
1626 support.create_empty_file(os.path.join(self.bdir, fn))
1627 fn = os.fsdecode(fn)
1628 if fn in self.unicodefn:
1629 raise ValueError("duplicate filename")
1630 self.unicodefn.add(fn)
1631 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001632 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001633 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001634
Serhiy Storchaka43767632013-11-03 21:31:38 +02001635 def tearDown(self):
1636 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001637
Serhiy Storchaka43767632013-11-03 21:31:38 +02001638 def test_listdir(self):
1639 expected = self.unicodefn
1640 found = set(os.listdir(self.dir))
1641 self.assertEqual(found, expected)
1642 # test listdir without arguments
1643 current_directory = os.getcwd()
1644 try:
1645 os.chdir(os.sep)
1646 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1647 finally:
1648 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001649
Serhiy Storchaka43767632013-11-03 21:31:38 +02001650 def test_open(self):
1651 for fn in self.unicodefn:
1652 f = open(os.path.join(self.dir, fn), 'rb')
1653 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001654
Serhiy Storchaka43767632013-11-03 21:31:38 +02001655 @unittest.skipUnless(hasattr(os, 'statvfs'),
1656 "need os.statvfs()")
1657 def test_statvfs(self):
1658 # issue #9645
1659 for fn in self.unicodefn:
1660 # should not fail with file not found error
1661 fullname = os.path.join(self.dir, fn)
1662 os.statvfs(fullname)
1663
1664 def test_stat(self):
1665 for fn in self.unicodefn:
1666 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001667
Brian Curtineb24d742010-04-12 17:16:38 +00001668@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1669class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001670 def _kill(self, sig):
1671 # Start sys.executable as a subprocess and communicate from the
1672 # subprocess to the parent that the interpreter is ready. When it
1673 # becomes ready, send *sig* via os.kill to the subprocess and check
1674 # that the return code is equal to *sig*.
1675 import ctypes
1676 from ctypes import wintypes
1677 import msvcrt
1678
1679 # Since we can't access the contents of the process' stdout until the
1680 # process has exited, use PeekNamedPipe to see what's inside stdout
1681 # without waiting. This is done so we can tell that the interpreter
1682 # is started and running at a point where it could handle a signal.
1683 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1684 PeekNamedPipe.restype = wintypes.BOOL
1685 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1686 ctypes.POINTER(ctypes.c_char), # stdout buf
1687 wintypes.DWORD, # Buffer size
1688 ctypes.POINTER(wintypes.DWORD), # bytes read
1689 ctypes.POINTER(wintypes.DWORD), # bytes avail
1690 ctypes.POINTER(wintypes.DWORD)) # bytes left
1691 msg = "running"
1692 proc = subprocess.Popen([sys.executable, "-c",
1693 "import sys;"
1694 "sys.stdout.write('{}');"
1695 "sys.stdout.flush();"
1696 "input()".format(msg)],
1697 stdout=subprocess.PIPE,
1698 stderr=subprocess.PIPE,
1699 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001700 self.addCleanup(proc.stdout.close)
1701 self.addCleanup(proc.stderr.close)
1702 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001703
1704 count, max = 0, 100
1705 while count < max and proc.poll() is None:
1706 # Create a string buffer to store the result of stdout from the pipe
1707 buf = ctypes.create_string_buffer(len(msg))
1708 # Obtain the text currently in proc.stdout
1709 # Bytes read/avail/left are left as NULL and unused
1710 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1711 buf, ctypes.sizeof(buf), None, None, None)
1712 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1713 if buf.value:
1714 self.assertEqual(msg, buf.value.decode())
1715 break
1716 time.sleep(0.1)
1717 count += 1
1718 else:
1719 self.fail("Did not receive communication from the subprocess")
1720
Brian Curtineb24d742010-04-12 17:16:38 +00001721 os.kill(proc.pid, sig)
1722 self.assertEqual(proc.wait(), sig)
1723
1724 def test_kill_sigterm(self):
1725 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001726 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001727
1728 def test_kill_int(self):
1729 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001730 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001731
1732 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001733 tagname = "test_os_%s" % uuid.uuid1()
1734 m = mmap.mmap(-1, 1, tagname)
1735 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001736 # Run a script which has console control handling enabled.
1737 proc = subprocess.Popen([sys.executable,
1738 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001739 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001740 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1741 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001742 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001743 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001744 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001745 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001746 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001747 count += 1
1748 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001749 # Forcefully kill the process if we weren't able to signal it.
1750 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001751 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001752 os.kill(proc.pid, event)
1753 # proc.send_signal(event) could also be done here.
1754 # Allow time for the signal to be passed and the process to exit.
1755 time.sleep(0.5)
1756 if not proc.poll():
1757 # Forcefully kill the process if we weren't able to signal it.
1758 os.kill(proc.pid, signal.SIGINT)
1759 self.fail("subprocess did not stop on {}".format(name))
1760
1761 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1762 def test_CTRL_C_EVENT(self):
1763 from ctypes import wintypes
1764 import ctypes
1765
1766 # Make a NULL value by creating a pointer with no argument.
1767 NULL = ctypes.POINTER(ctypes.c_int)()
1768 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1769 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1770 wintypes.BOOL)
1771 SetConsoleCtrlHandler.restype = wintypes.BOOL
1772
1773 # Calling this with NULL and FALSE causes the calling process to
1774 # handle CTRL+C, rather than ignore it. This property is inherited
1775 # by subprocesses.
1776 SetConsoleCtrlHandler(NULL, 0)
1777
1778 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1779
1780 def test_CTRL_BREAK_EVENT(self):
1781 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1782
1783
Brian Curtind40e6f72010-07-08 21:39:08 +00001784@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001785class Win32ListdirTests(unittest.TestCase):
1786 """Test listdir on Windows."""
1787
1788 def setUp(self):
1789 self.created_paths = []
1790 for i in range(2):
1791 dir_name = 'SUB%d' % i
1792 dir_path = os.path.join(support.TESTFN, dir_name)
1793 file_name = 'FILE%d' % i
1794 file_path = os.path.join(support.TESTFN, file_name)
1795 os.makedirs(dir_path)
1796 with open(file_path, 'w') as f:
1797 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1798 self.created_paths.extend([dir_name, file_name])
1799 self.created_paths.sort()
1800
1801 def tearDown(self):
1802 shutil.rmtree(support.TESTFN)
1803
1804 def test_listdir_no_extended_path(self):
1805 """Test when the path is not an "extended" path."""
1806 # unicode
1807 self.assertEqual(
1808 sorted(os.listdir(support.TESTFN)),
1809 self.created_paths)
1810 # bytes
1811 self.assertEqual(
1812 sorted(os.listdir(os.fsencode(support.TESTFN))),
1813 [os.fsencode(path) for path in self.created_paths])
1814
1815 def test_listdir_extended_path(self):
1816 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001817 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001818 # unicode
1819 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1820 self.assertEqual(
1821 sorted(os.listdir(path)),
1822 self.created_paths)
1823 # bytes
1824 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1825 self.assertEqual(
1826 sorted(os.listdir(path)),
1827 [os.fsencode(path) for path in self.created_paths])
1828
1829
1830@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001831@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001832class Win32SymlinkTests(unittest.TestCase):
1833 filelink = 'filelinktest'
1834 filelink_target = os.path.abspath(__file__)
1835 dirlink = 'dirlinktest'
1836 dirlink_target = os.path.dirname(filelink_target)
1837 missing_link = 'missing link'
1838
1839 def setUp(self):
1840 assert os.path.exists(self.dirlink_target)
1841 assert os.path.exists(self.filelink_target)
1842 assert not os.path.exists(self.dirlink)
1843 assert not os.path.exists(self.filelink)
1844 assert not os.path.exists(self.missing_link)
1845
1846 def tearDown(self):
1847 if os.path.exists(self.filelink):
1848 os.remove(self.filelink)
1849 if os.path.exists(self.dirlink):
1850 os.rmdir(self.dirlink)
1851 if os.path.lexists(self.missing_link):
1852 os.remove(self.missing_link)
1853
1854 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001855 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001856 self.assertTrue(os.path.exists(self.dirlink))
1857 self.assertTrue(os.path.isdir(self.dirlink))
1858 self.assertTrue(os.path.islink(self.dirlink))
1859 self.check_stat(self.dirlink, self.dirlink_target)
1860
1861 def test_file_link(self):
1862 os.symlink(self.filelink_target, self.filelink)
1863 self.assertTrue(os.path.exists(self.filelink))
1864 self.assertTrue(os.path.isfile(self.filelink))
1865 self.assertTrue(os.path.islink(self.filelink))
1866 self.check_stat(self.filelink, self.filelink_target)
1867
1868 def _create_missing_dir_link(self):
1869 'Create a "directory" link to a non-existent target'
1870 linkname = self.missing_link
1871 if os.path.lexists(linkname):
1872 os.remove(linkname)
1873 target = r'c:\\target does not exist.29r3c740'
1874 assert not os.path.exists(target)
1875 target_is_dir = True
1876 os.symlink(target, linkname, target_is_dir)
1877
1878 def test_remove_directory_link_to_missing_target(self):
1879 self._create_missing_dir_link()
1880 # For compatibility with Unix, os.remove will check the
1881 # directory status and call RemoveDirectory if the symlink
1882 # was created with target_is_dir==True.
1883 os.remove(self.missing_link)
1884
1885 @unittest.skip("currently fails; consider for improvement")
1886 def test_isdir_on_directory_link_to_missing_target(self):
1887 self._create_missing_dir_link()
1888 # consider having isdir return true for directory links
1889 self.assertTrue(os.path.isdir(self.missing_link))
1890
1891 @unittest.skip("currently fails; consider for improvement")
1892 def test_rmdir_on_directory_link_to_missing_target(self):
1893 self._create_missing_dir_link()
1894 # consider allowing rmdir to remove directory links
1895 os.rmdir(self.missing_link)
1896
1897 def check_stat(self, link, target):
1898 self.assertEqual(os.stat(link), os.stat(target))
1899 self.assertNotEqual(os.lstat(link), os.stat(link))
1900
Brian Curtind25aef52011-06-13 15:16:04 -05001901 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001902 with warnings.catch_warnings():
1903 warnings.simplefilter("ignore", DeprecationWarning)
1904 self.assertEqual(os.stat(bytes_link), os.stat(target))
1905 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001906
1907 def test_12084(self):
1908 level1 = os.path.abspath(support.TESTFN)
1909 level2 = os.path.join(level1, "level2")
1910 level3 = os.path.join(level2, "level3")
1911 try:
1912 os.mkdir(level1)
1913 os.mkdir(level2)
1914 os.mkdir(level3)
1915
1916 file1 = os.path.abspath(os.path.join(level1, "file1"))
1917
1918 with open(file1, "w") as f:
1919 f.write("file1")
1920
1921 orig_dir = os.getcwd()
1922 try:
1923 os.chdir(level2)
1924 link = os.path.join(level2, "link")
1925 os.symlink(os.path.relpath(file1), "link")
1926 self.assertIn("link", os.listdir(os.getcwd()))
1927
1928 # Check os.stat calls from the same dir as the link
1929 self.assertEqual(os.stat(file1), os.stat("link"))
1930
1931 # Check os.stat calls from a dir below the link
1932 os.chdir(level1)
1933 self.assertEqual(os.stat(file1),
1934 os.stat(os.path.relpath(link)))
1935
1936 # Check os.stat calls from a dir above the link
1937 os.chdir(level3)
1938 self.assertEqual(os.stat(file1),
1939 os.stat(os.path.relpath(link)))
1940 finally:
1941 os.chdir(orig_dir)
1942 except OSError as err:
1943 self.fail(err)
1944 finally:
1945 os.remove(file1)
1946 shutil.rmtree(level1)
1947
Brian Curtind40e6f72010-07-08 21:39:08 +00001948
Tim Golden0321cf22014-05-05 19:46:17 +01001949@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1950class Win32JunctionTests(unittest.TestCase):
1951 junction = 'junctiontest'
1952 junction_target = os.path.dirname(os.path.abspath(__file__))
1953
1954 def setUp(self):
1955 assert os.path.exists(self.junction_target)
1956 assert not os.path.exists(self.junction)
1957
1958 def tearDown(self):
1959 if os.path.exists(self.junction):
1960 # os.rmdir delegates to Windows' RemoveDirectoryW,
1961 # which removes junction points safely.
1962 os.rmdir(self.junction)
1963
1964 def test_create_junction(self):
1965 _winapi.CreateJunction(self.junction_target, self.junction)
1966 self.assertTrue(os.path.exists(self.junction))
1967 self.assertTrue(os.path.isdir(self.junction))
1968
1969 # Junctions are not recognized as links.
1970 self.assertFalse(os.path.islink(self.junction))
1971
1972 def test_unlink_removes_junction(self):
1973 _winapi.CreateJunction(self.junction_target, self.junction)
1974 self.assertTrue(os.path.exists(self.junction))
1975
1976 os.unlink(self.junction)
1977 self.assertFalse(os.path.exists(self.junction))
1978
1979
Jason R. Coombs3a092862013-05-27 23:21:28 -04001980@support.skip_unless_symlink
1981class NonLocalSymlinkTests(unittest.TestCase):
1982
1983 def setUp(self):
1984 """
1985 Create this structure:
1986
1987 base
1988 \___ some_dir
1989 """
1990 os.makedirs('base/some_dir')
1991
1992 def tearDown(self):
1993 shutil.rmtree('base')
1994
1995 def test_directory_link_nonlocal(self):
1996 """
1997 The symlink target should resolve relative to the link, not relative
1998 to the current directory.
1999
2000 Then, link base/some_link -> base/some_dir and ensure that some_link
2001 is resolved as a directory.
2002
2003 In issue13772, it was discovered that directory detection failed if
2004 the symlink target was not specified relative to the current
2005 directory, which was a defect in the implementation.
2006 """
2007 src = os.path.join('base', 'some_link')
2008 os.symlink('some_dir', src)
2009 assert os.path.isdir(src)
2010
2011
Victor Stinnere8d51452010-08-19 01:05:19 +00002012class FSEncodingTests(unittest.TestCase):
2013 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002014 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2015 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002016
Victor Stinnere8d51452010-08-19 01:05:19 +00002017 def test_identity(self):
2018 # assert fsdecode(fsencode(x)) == x
2019 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2020 try:
2021 bytesfn = os.fsencode(fn)
2022 except UnicodeEncodeError:
2023 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002024 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002025
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002026
Brett Cannonefb00c02012-02-29 18:31:31 -05002027
2028class DeviceEncodingTests(unittest.TestCase):
2029
2030 def test_bad_fd(self):
2031 # Return None when an fd doesn't actually exist.
2032 self.assertIsNone(os.device_encoding(123456))
2033
Philip Jenveye308b7c2012-02-29 16:16:15 -08002034 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2035 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002036 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002037 def test_device_encoding(self):
2038 encoding = os.device_encoding(0)
2039 self.assertIsNotNone(encoding)
2040 self.assertTrue(codecs.lookup(encoding))
2041
2042
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002043class PidTests(unittest.TestCase):
2044 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2045 def test_getppid(self):
2046 p = subprocess.Popen([sys.executable, '-c',
2047 'import os; print(os.getppid())'],
2048 stdout=subprocess.PIPE)
2049 stdout, _ = p.communicate()
2050 # We are the parent of our subprocess
2051 self.assertEqual(int(stdout), os.getpid())
2052
2053
Brian Curtin0151b8e2010-09-24 13:43:43 +00002054# The introduction of this TestCase caused at least two different errors on
2055# *nix buildbots. Temporarily skip this to let the buildbots move along.
2056@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002057@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2058class LoginTests(unittest.TestCase):
2059 def test_getlogin(self):
2060 user_name = os.getlogin()
2061 self.assertNotEqual(len(user_name), 0)
2062
2063
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002064@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2065 "needs os.getpriority and os.setpriority")
2066class ProgramPriorityTests(unittest.TestCase):
2067 """Tests for os.getpriority() and os.setpriority()."""
2068
2069 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002070
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002071 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2072 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2073 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002074 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2075 if base >= 19 and new_prio <= 19:
2076 raise unittest.SkipTest(
2077 "unable to reliably test setpriority at current nice level of %s" % base)
2078 else:
2079 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002080 finally:
2081 try:
2082 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2083 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002084 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002085 raise
2086
2087
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002088if threading is not None:
2089 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002090
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002091 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002092
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002093 def __init__(self, conn):
2094 asynchat.async_chat.__init__(self, conn)
2095 self.in_buffer = []
2096 self.closed = False
2097 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002098
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002099 def handle_read(self):
2100 data = self.recv(4096)
2101 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002102
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002103 def get_data(self):
2104 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002105
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002106 def handle_close(self):
2107 self.close()
2108 self.closed = True
2109
2110 def handle_error(self):
2111 raise
2112
2113 def __init__(self, address):
2114 threading.Thread.__init__(self)
2115 asyncore.dispatcher.__init__(self)
2116 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2117 self.bind(address)
2118 self.listen(5)
2119 self.host, self.port = self.socket.getsockname()[:2]
2120 self.handler_instance = None
2121 self._active = False
2122 self._active_lock = threading.Lock()
2123
2124 # --- public API
2125
2126 @property
2127 def running(self):
2128 return self._active
2129
2130 def start(self):
2131 assert not self.running
2132 self.__flag = threading.Event()
2133 threading.Thread.start(self)
2134 self.__flag.wait()
2135
2136 def stop(self):
2137 assert self.running
2138 self._active = False
2139 self.join()
2140
2141 def wait(self):
2142 # wait for handler connection to be closed, then stop the server
2143 while not getattr(self.handler_instance, "closed", False):
2144 time.sleep(0.001)
2145 self.stop()
2146
2147 # --- internals
2148
2149 def run(self):
2150 self._active = True
2151 self.__flag.set()
2152 while self._active and asyncore.socket_map:
2153 self._active_lock.acquire()
2154 asyncore.loop(timeout=0.001, count=1)
2155 self._active_lock.release()
2156 asyncore.close_all()
2157
2158 def handle_accept(self):
2159 conn, addr = self.accept()
2160 self.handler_instance = self.Handler(conn)
2161
2162 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002163 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002164 handle_read = handle_connect
2165
2166 def writable(self):
2167 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002168
2169 def handle_error(self):
2170 raise
2171
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002172
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002173@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002174@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2175class TestSendfile(unittest.TestCase):
2176
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002177 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002178 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002179 not sys.platform.startswith("solaris") and \
2180 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002181 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2182 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002183
2184 @classmethod
2185 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002186 cls.key = support.threading_setup()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002187 with open(support.TESTFN, "wb") as f:
2188 f.write(cls.DATA)
2189
2190 @classmethod
2191 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002192 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002193 support.unlink(support.TESTFN)
2194
2195 def setUp(self):
2196 self.server = SendfileTestServer((support.HOST, 0))
2197 self.server.start()
2198 self.client = socket.socket()
2199 self.client.connect((self.server.host, self.server.port))
2200 self.client.settimeout(1)
2201 # synchronize by waiting for "220 ready" response
2202 self.client.recv(1024)
2203 self.sockno = self.client.fileno()
2204 self.file = open(support.TESTFN, 'rb')
2205 self.fileno = self.file.fileno()
2206
2207 def tearDown(self):
2208 self.file.close()
2209 self.client.close()
2210 if self.server.running:
2211 self.server.stop()
2212
2213 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2214 """A higher level wrapper representing how an application is
2215 supposed to use sendfile().
2216 """
2217 while 1:
2218 try:
2219 if self.SUPPORT_HEADERS_TRAILERS:
2220 return os.sendfile(sock, file, offset, nbytes, headers,
2221 trailers)
2222 else:
2223 return os.sendfile(sock, file, offset, nbytes)
2224 except OSError as err:
2225 if err.errno == errno.ECONNRESET:
2226 # disconnected
2227 raise
2228 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2229 # we have to retry send data
2230 continue
2231 else:
2232 raise
2233
2234 def test_send_whole_file(self):
2235 # normal send
2236 total_sent = 0
2237 offset = 0
2238 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002239 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002240 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2241 if sent == 0:
2242 break
2243 offset += sent
2244 total_sent += sent
2245 self.assertTrue(sent <= nbytes)
2246 self.assertEqual(offset, total_sent)
2247
2248 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002249 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002250 self.client.close()
2251 self.server.wait()
2252 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002253 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002254 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002255
2256 def test_send_at_certain_offset(self):
2257 # start sending a file at a certain offset
2258 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002259 offset = len(self.DATA) // 2
2260 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002261 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002262 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002263 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2264 if sent == 0:
2265 break
2266 offset += sent
2267 total_sent += sent
2268 self.assertTrue(sent <= nbytes)
2269
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002270 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002271 self.client.close()
2272 self.server.wait()
2273 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002274 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002275 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002276 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002277 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002278
2279 def test_offset_overflow(self):
2280 # specify an offset > file size
2281 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002282 try:
2283 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2284 except OSError as e:
2285 # Solaris can raise EINVAL if offset >= file length, ignore.
2286 if e.errno != errno.EINVAL:
2287 raise
2288 else:
2289 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002290 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002291 self.client.close()
2292 self.server.wait()
2293 data = self.server.handler_instance.get_data()
2294 self.assertEqual(data, b'')
2295
2296 def test_invalid_offset(self):
2297 with self.assertRaises(OSError) as cm:
2298 os.sendfile(self.sockno, self.fileno, -1, 4096)
2299 self.assertEqual(cm.exception.errno, errno.EINVAL)
2300
2301 # --- headers / trailers tests
2302
Serhiy Storchaka43767632013-11-03 21:31:38 +02002303 @requires_headers_trailers
2304 def test_headers(self):
2305 total_sent = 0
2306 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2307 headers=[b"x" * 512])
2308 total_sent += sent
2309 offset = 4096
2310 nbytes = 4096
2311 while 1:
2312 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2313 offset, nbytes)
2314 if sent == 0:
2315 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002316 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002317 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002318
Serhiy Storchaka43767632013-11-03 21:31:38 +02002319 expected_data = b"x" * 512 + self.DATA
2320 self.assertEqual(total_sent, len(expected_data))
2321 self.client.close()
2322 self.server.wait()
2323 data = self.server.handler_instance.get_data()
2324 self.assertEqual(hash(data), hash(expected_data))
2325
2326 @requires_headers_trailers
2327 def test_trailers(self):
2328 TESTFN2 = support.TESTFN + "2"
2329 file_data = b"abcdef"
2330 with open(TESTFN2, 'wb') as f:
2331 f.write(file_data)
2332 with open(TESTFN2, 'rb')as f:
2333 self.addCleanup(os.remove, TESTFN2)
2334 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2335 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002336 self.client.close()
2337 self.server.wait()
2338 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002339 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002340
Serhiy Storchaka43767632013-11-03 21:31:38 +02002341 @requires_headers_trailers
2342 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2343 'test needs os.SF_NODISKIO')
2344 def test_flags(self):
2345 try:
2346 os.sendfile(self.sockno, self.fileno, 0, 4096,
2347 flags=os.SF_NODISKIO)
2348 except OSError as err:
2349 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2350 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002351
2352
Larry Hastings9cf065c2012-06-22 16:30:09 -07002353def supports_extended_attributes():
2354 if not hasattr(os, "setxattr"):
2355 return False
2356 try:
2357 with open(support.TESTFN, "wb") as fp:
2358 try:
2359 os.setxattr(fp.fileno(), b"user.test", b"")
2360 except OSError:
2361 return False
2362 finally:
2363 support.unlink(support.TESTFN)
2364 # Kernels < 2.6.39 don't respect setxattr flags.
2365 kernel_version = platform.release()
2366 m = re.match("2.6.(\d{1,2})", kernel_version)
2367 return m is None or int(m.group(1)) >= 39
2368
2369
2370@unittest.skipUnless(supports_extended_attributes(),
2371 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002372class ExtendedAttributeTests(unittest.TestCase):
2373
2374 def tearDown(self):
2375 support.unlink(support.TESTFN)
2376
Larry Hastings9cf065c2012-06-22 16:30:09 -07002377 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002378 fn = support.TESTFN
2379 open(fn, "wb").close()
2380 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002381 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002382 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002383 init_xattr = listxattr(fn)
2384 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002385 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002386 xattr = set(init_xattr)
2387 xattr.add("user.test")
2388 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002389 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2390 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2391 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002392 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002393 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002394 self.assertEqual(cm.exception.errno, errno.EEXIST)
2395 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002396 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002397 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002398 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002399 xattr.add("user.test2")
2400 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002401 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002402 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002403 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002404 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002405 xattr.remove("user.test")
2406 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002407 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2408 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2409 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2410 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002411 many = sorted("user.test{}".format(i) for i in range(100))
2412 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002413 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002414 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002415
Larry Hastings9cf065c2012-06-22 16:30:09 -07002416 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002417 def make_bytes(s):
2418 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002419 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002420 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002421 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002422
2423 def test_simple(self):
2424 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2425 os.listxattr)
2426
2427 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002428 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2429 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002430
2431 def test_fds(self):
2432 def getxattr(path, *args):
2433 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002434 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002435 def setxattr(path, *args):
2436 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002437 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002438 def removexattr(path, *args):
2439 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002440 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002441 def listxattr(path, *args):
2442 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002443 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002444 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2445
2446
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002447@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2448class Win32DeprecatedBytesAPI(unittest.TestCase):
2449 def test_deprecated(self):
2450 import nt
2451 filename = os.fsencode(support.TESTFN)
2452 with warnings.catch_warnings():
2453 warnings.simplefilter("error", DeprecationWarning)
2454 for func, *args in (
2455 (nt._getfullpathname, filename),
2456 (nt._isdir, filename),
2457 (os.access, filename, os.R_OK),
2458 (os.chdir, filename),
2459 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002460 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002461 (os.link, filename, filename),
2462 (os.listdir, filename),
2463 (os.lstat, filename),
2464 (os.mkdir, filename),
2465 (os.open, filename, os.O_RDONLY),
2466 (os.rename, filename, filename),
2467 (os.rmdir, filename),
2468 (os.startfile, filename),
2469 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002470 (os.unlink, filename),
2471 (os.utime, filename),
2472 ):
2473 self.assertRaises(DeprecationWarning, func, *args)
2474
Victor Stinner28216442011-11-16 00:34:44 +01002475 @support.skip_unless_symlink
2476 def test_symlink(self):
2477 filename = os.fsencode(support.TESTFN)
2478 with warnings.catch_warnings():
2479 warnings.simplefilter("error", DeprecationWarning)
2480 self.assertRaises(DeprecationWarning,
2481 os.symlink, filename, filename)
2482
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002483
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002484@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2485class TermsizeTests(unittest.TestCase):
2486 def test_does_not_crash(self):
2487 """Check if get_terminal_size() returns a meaningful value.
2488
2489 There's no easy portable way to actually check the size of the
2490 terminal, so let's check if it returns something sensible instead.
2491 """
2492 try:
2493 size = os.get_terminal_size()
2494 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002495 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002496 # Under win32 a generic OSError can be thrown if the
2497 # handle cannot be retrieved
2498 self.skipTest("failed to query terminal size")
2499 raise
2500
Antoine Pitroucfade362012-02-08 23:48:59 +01002501 self.assertGreaterEqual(size.columns, 0)
2502 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002503
2504 def test_stty_match(self):
2505 """Check if stty returns the same results
2506
2507 stty actually tests stdin, so get_terminal_size is invoked on
2508 stdin explicitly. If stty succeeded, then get_terminal_size()
2509 should work too.
2510 """
2511 try:
2512 size = subprocess.check_output(['stty', 'size']).decode().split()
2513 except (FileNotFoundError, subprocess.CalledProcessError):
2514 self.skipTest("stty invocation failed")
2515 expected = (int(size[1]), int(size[0])) # reversed order
2516
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002517 try:
2518 actual = os.get_terminal_size(sys.__stdin__.fileno())
2519 except OSError as e:
2520 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2521 # Under win32 a generic OSError can be thrown if the
2522 # handle cannot be retrieved
2523 self.skipTest("failed to query terminal size")
2524 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002525 self.assertEqual(expected, actual)
2526
2527
Victor Stinner292c8352012-10-30 02:17:38 +01002528class OSErrorTests(unittest.TestCase):
2529 def setUp(self):
2530 class Str(str):
2531 pass
2532
Victor Stinnerafe17062012-10-31 22:47:43 +01002533 self.bytes_filenames = []
2534 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002535 if support.TESTFN_UNENCODABLE is not None:
2536 decoded = support.TESTFN_UNENCODABLE
2537 else:
2538 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002539 self.unicode_filenames.append(decoded)
2540 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002541 if support.TESTFN_UNDECODABLE is not None:
2542 encoded = support.TESTFN_UNDECODABLE
2543 else:
2544 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002545 self.bytes_filenames.append(encoded)
2546 self.bytes_filenames.append(memoryview(encoded))
2547
2548 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002549
2550 def test_oserror_filename(self):
2551 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002552 (self.filenames, os.chdir,),
2553 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002554 (self.filenames, os.lstat,),
2555 (self.filenames, os.open, os.O_RDONLY),
2556 (self.filenames, os.rmdir,),
2557 (self.filenames, os.stat,),
2558 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002559 ]
2560 if sys.platform == "win32":
2561 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002562 (self.bytes_filenames, os.rename, b"dst"),
2563 (self.bytes_filenames, os.replace, b"dst"),
2564 (self.unicode_filenames, os.rename, "dst"),
2565 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002566 # Issue #16414: Don't test undecodable names with listdir()
2567 # because of a Windows bug.
2568 #
2569 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2570 # empty list (instead of failing), whereas os.listdir(b'\xff')
2571 # raises a FileNotFoundError. It looks like a Windows bug:
2572 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2573 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2574 # ERROR_PATH_NOT_FOUND (3).
2575 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002576 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002577 else:
2578 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002579 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002580 (self.filenames, os.rename, "dst"),
2581 (self.filenames, os.replace, "dst"),
2582 ))
2583 if hasattr(os, "chown"):
2584 funcs.append((self.filenames, os.chown, 0, 0))
2585 if hasattr(os, "lchown"):
2586 funcs.append((self.filenames, os.lchown, 0, 0))
2587 if hasattr(os, "truncate"):
2588 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002589 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002590 funcs.append((self.filenames, os.chflags, 0))
2591 if hasattr(os, "lchflags"):
2592 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002593 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002594 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002595 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002596 if sys.platform == "win32":
2597 funcs.append((self.bytes_filenames, os.link, b"dst"))
2598 funcs.append((self.unicode_filenames, os.link, "dst"))
2599 else:
2600 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002601 if hasattr(os, "listxattr"):
2602 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002603 (self.filenames, os.listxattr,),
2604 (self.filenames, os.getxattr, "user.test"),
2605 (self.filenames, os.setxattr, "user.test", b'user'),
2606 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002607 ))
2608 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002609 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002610 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002611 if sys.platform == "win32":
2612 funcs.append((self.unicode_filenames, os.readlink,))
2613 else:
2614 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002615
Victor Stinnerafe17062012-10-31 22:47:43 +01002616 for filenames, func, *func_args in funcs:
2617 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002618 try:
2619 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002620 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002621 self.assertIs(err.filename, name)
2622 else:
2623 self.fail("No exception thrown by {}".format(func))
2624
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002625class CPUCountTests(unittest.TestCase):
2626 def test_cpu_count(self):
2627 cpus = os.cpu_count()
2628 if cpus is not None:
2629 self.assertIsInstance(cpus, int)
2630 self.assertGreater(cpus, 0)
2631 else:
2632 self.skipTest("Could not determine the number of CPUs")
2633
Victor Stinnerdaf45552013-08-28 00:53:59 +02002634
2635class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002636 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002637 fd = os.open(__file__, os.O_RDONLY)
2638 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002639 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002640
Victor Stinnerdaf45552013-08-28 00:53:59 +02002641 os.set_inheritable(fd, True)
2642 self.assertEqual(os.get_inheritable(fd), True)
2643
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002644 @unittest.skipIf(fcntl is None, "need fcntl")
2645 def test_get_inheritable_cloexec(self):
2646 fd = os.open(__file__, os.O_RDONLY)
2647 self.addCleanup(os.close, fd)
2648 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002649
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002650 # clear FD_CLOEXEC flag
2651 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2652 flags &= ~fcntl.FD_CLOEXEC
2653 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002654
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002655 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002656
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002657 @unittest.skipIf(fcntl is None, "need fcntl")
2658 def test_set_inheritable_cloexec(self):
2659 fd = os.open(__file__, os.O_RDONLY)
2660 self.addCleanup(os.close, fd)
2661 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2662 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002663
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002664 os.set_inheritable(fd, True)
2665 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2666 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002667
Victor Stinnerdaf45552013-08-28 00:53:59 +02002668 def test_open(self):
2669 fd = os.open(__file__, os.O_RDONLY)
2670 self.addCleanup(os.close, fd)
2671 self.assertEqual(os.get_inheritable(fd), False)
2672
2673 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2674 def test_pipe(self):
2675 rfd, wfd = os.pipe()
2676 self.addCleanup(os.close, rfd)
2677 self.addCleanup(os.close, wfd)
2678 self.assertEqual(os.get_inheritable(rfd), False)
2679 self.assertEqual(os.get_inheritable(wfd), False)
2680
2681 def test_dup(self):
2682 fd1 = os.open(__file__, os.O_RDONLY)
2683 self.addCleanup(os.close, fd1)
2684
2685 fd2 = os.dup(fd1)
2686 self.addCleanup(os.close, fd2)
2687 self.assertEqual(os.get_inheritable(fd2), False)
2688
2689 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2690 def test_dup2(self):
2691 fd = os.open(__file__, os.O_RDONLY)
2692 self.addCleanup(os.close, fd)
2693
2694 # inheritable by default
2695 fd2 = os.open(__file__, os.O_RDONLY)
2696 try:
2697 os.dup2(fd, fd2)
2698 self.assertEqual(os.get_inheritable(fd2), True)
2699 finally:
2700 os.close(fd2)
2701
2702 # force non-inheritable
2703 fd3 = os.open(__file__, os.O_RDONLY)
2704 try:
2705 os.dup2(fd, fd3, inheritable=False)
2706 self.assertEqual(os.get_inheritable(fd3), False)
2707 finally:
2708 os.close(fd3)
2709
2710 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2711 def test_openpty(self):
2712 master_fd, slave_fd = os.openpty()
2713 self.addCleanup(os.close, master_fd)
2714 self.addCleanup(os.close, slave_fd)
2715 self.assertEqual(os.get_inheritable(master_fd), False)
2716 self.assertEqual(os.get_inheritable(slave_fd), False)
2717
2718
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002719@unittest.skipUnless(hasattr(os, 'get_blocking'),
2720 'needs os.get_blocking() and os.set_blocking()')
2721class BlockingTests(unittest.TestCase):
2722 def test_blocking(self):
2723 fd = os.open(__file__, os.O_RDONLY)
2724 self.addCleanup(os.close, fd)
2725 self.assertEqual(os.get_blocking(fd), True)
2726
2727 os.set_blocking(fd, False)
2728 self.assertEqual(os.get_blocking(fd), False)
2729
2730 os.set_blocking(fd, True)
2731 self.assertEqual(os.get_blocking(fd), True)
2732
2733
Yury Selivanov97e2e062014-09-26 12:33:06 -04002734
2735class ExportsTests(unittest.TestCase):
2736 def test_os_all(self):
2737 self.assertIn('open', os.__all__)
2738 self.assertIn('walk', os.__all__)
2739
2740
Victor Stinner6036e442015-03-08 01:58:04 +01002741class TestScandir(unittest.TestCase):
2742 def setUp(self):
2743 self.path = os.path.realpath(support.TESTFN)
2744 self.addCleanup(support.rmtree, self.path)
2745 os.mkdir(self.path)
2746
2747 def create_file(self, name="file.txt"):
2748 filename = os.path.join(self.path, name)
2749 with open(filename, "wb") as fp:
2750 fp.write(b'python')
2751 return filename
2752
2753 def get_entries(self, names):
2754 entries = dict((entry.name, entry)
2755 for entry in os.scandir(self.path))
2756 self.assertEqual(sorted(entries.keys()), names)
2757 return entries
2758
2759 def assert_stat_equal(self, stat1, stat2, skip_fields):
2760 if skip_fields:
2761 for attr in dir(stat1):
2762 if not attr.startswith("st_"):
2763 continue
2764 if attr in ("st_dev", "st_ino", "st_nlink"):
2765 continue
2766 self.assertEqual(getattr(stat1, attr),
2767 getattr(stat2, attr),
2768 (stat1, stat2, attr))
2769 else:
2770 self.assertEqual(stat1, stat2)
2771
2772 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2773 self.assertEqual(entry.name, name)
2774 self.assertEqual(entry.path, os.path.join(self.path, name))
2775 self.assertEqual(entry.inode(),
2776 os.stat(entry.path, follow_symlinks=False).st_ino)
2777
2778 entry_stat = os.stat(entry.path)
2779 self.assertEqual(entry.is_dir(),
2780 stat.S_ISDIR(entry_stat.st_mode))
2781 self.assertEqual(entry.is_file(),
2782 stat.S_ISREG(entry_stat.st_mode))
2783 self.assertEqual(entry.is_symlink(),
2784 os.path.islink(entry.path))
2785
2786 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2787 self.assertEqual(entry.is_dir(follow_symlinks=False),
2788 stat.S_ISDIR(entry_lstat.st_mode))
2789 self.assertEqual(entry.is_file(follow_symlinks=False),
2790 stat.S_ISREG(entry_lstat.st_mode))
2791
2792 self.assert_stat_equal(entry.stat(),
2793 entry_stat,
2794 os.name == 'nt' and not is_symlink)
2795 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2796 entry_lstat,
2797 os.name == 'nt')
2798
2799 def test_attributes(self):
2800 link = hasattr(os, 'link')
2801 symlink = support.can_symlink()
2802
2803 dirname = os.path.join(self.path, "dir")
2804 os.mkdir(dirname)
2805 filename = self.create_file("file.txt")
2806 if link:
2807 os.link(filename, os.path.join(self.path, "link_file.txt"))
2808 if symlink:
2809 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2810 target_is_directory=True)
2811 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2812
2813 names = ['dir', 'file.txt']
2814 if link:
2815 names.append('link_file.txt')
2816 if symlink:
2817 names.extend(('symlink_dir', 'symlink_file.txt'))
2818 entries = self.get_entries(names)
2819
2820 entry = entries['dir']
2821 self.check_entry(entry, 'dir', True, False, False)
2822
2823 entry = entries['file.txt']
2824 self.check_entry(entry, 'file.txt', False, True, False)
2825
2826 if link:
2827 entry = entries['link_file.txt']
2828 self.check_entry(entry, 'link_file.txt', False, True, False)
2829
2830 if symlink:
2831 entry = entries['symlink_dir']
2832 self.check_entry(entry, 'symlink_dir', True, False, True)
2833
2834 entry = entries['symlink_file.txt']
2835 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2836
2837 def get_entry(self, name):
2838 entries = list(os.scandir(self.path))
2839 self.assertEqual(len(entries), 1)
2840
2841 entry = entries[0]
2842 self.assertEqual(entry.name, name)
2843 return entry
2844
2845 def create_file_entry(self):
2846 filename = self.create_file()
2847 return self.get_entry(os.path.basename(filename))
2848
2849 def test_current_directory(self):
2850 filename = self.create_file()
2851 old_dir = os.getcwd()
2852 try:
2853 os.chdir(self.path)
2854
2855 # call scandir() without parameter: it must list the content
2856 # of the current directory
2857 entries = dict((entry.name, entry) for entry in os.scandir())
2858 self.assertEqual(sorted(entries.keys()),
2859 [os.path.basename(filename)])
2860 finally:
2861 os.chdir(old_dir)
2862
2863 def test_repr(self):
2864 entry = self.create_file_entry()
2865 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2866
2867 def test_removed_dir(self):
2868 path = os.path.join(self.path, 'dir')
2869
2870 os.mkdir(path)
2871 entry = self.get_entry('dir')
2872 os.rmdir(path)
2873
2874 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2875 if os.name == 'nt':
2876 self.assertTrue(entry.is_dir())
2877 self.assertFalse(entry.is_file())
2878 self.assertFalse(entry.is_symlink())
2879 if os.name == 'nt':
2880 self.assertRaises(FileNotFoundError, entry.inode)
2881 # don't fail
2882 entry.stat()
2883 entry.stat(follow_symlinks=False)
2884 else:
2885 self.assertGreater(entry.inode(), 0)
2886 self.assertRaises(FileNotFoundError, entry.stat)
2887 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2888
2889 def test_removed_file(self):
2890 entry = self.create_file_entry()
2891 os.unlink(entry.path)
2892
2893 self.assertFalse(entry.is_dir())
2894 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2895 if os.name == 'nt':
2896 self.assertTrue(entry.is_file())
2897 self.assertFalse(entry.is_symlink())
2898 if os.name == 'nt':
2899 self.assertRaises(FileNotFoundError, entry.inode)
2900 # don't fail
2901 entry.stat()
2902 entry.stat(follow_symlinks=False)
2903 else:
2904 self.assertGreater(entry.inode(), 0)
2905 self.assertRaises(FileNotFoundError, entry.stat)
2906 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2907
2908 def test_broken_symlink(self):
2909 if not support.can_symlink():
2910 return self.skipTest('cannot create symbolic link')
2911
2912 filename = self.create_file("file.txt")
2913 os.symlink(filename,
2914 os.path.join(self.path, "symlink.txt"))
2915 entries = self.get_entries(['file.txt', 'symlink.txt'])
2916 entry = entries['symlink.txt']
2917 os.unlink(filename)
2918
2919 self.assertGreater(entry.inode(), 0)
2920 self.assertFalse(entry.is_dir())
2921 self.assertFalse(entry.is_file()) # broken symlink returns False
2922 self.assertFalse(entry.is_dir(follow_symlinks=False))
2923 self.assertFalse(entry.is_file(follow_symlinks=False))
2924 self.assertTrue(entry.is_symlink())
2925 self.assertRaises(FileNotFoundError, entry.stat)
2926 # don't fail
2927 entry.stat(follow_symlinks=False)
2928
2929 def test_bytes(self):
2930 if os.name == "nt":
2931 # On Windows, os.scandir(bytes) must raise an exception
2932 self.assertRaises(TypeError, os.scandir, b'.')
2933 return
2934
2935 self.create_file("file.txt")
2936
2937 path_bytes = os.fsencode(self.path)
2938 entries = list(os.scandir(path_bytes))
2939 self.assertEqual(len(entries), 1, entries)
2940 entry = entries[0]
2941
2942 self.assertEqual(entry.name, b'file.txt')
2943 self.assertEqual(entry.path,
2944 os.fsencode(os.path.join(self.path, 'file.txt')))
2945
2946 def test_empty_path(self):
2947 self.assertRaises(FileNotFoundError, os.scandir, '')
2948
2949 def test_consume_iterator_twice(self):
2950 self.create_file("file.txt")
2951 iterator = os.scandir(self.path)
2952
2953 entries = list(iterator)
2954 self.assertEqual(len(entries), 1, entries)
2955
2956 # check than consuming the iterator twice doesn't raise exception
2957 entries2 = list(iterator)
2958 self.assertEqual(len(entries2), 0, entries2)
2959
2960 def test_bad_path_type(self):
2961 for obj in [1234, 1.234, {}, []]:
2962 self.assertRaises(TypeError, os.scandir, obj)
2963
2964
Fred Drake2e2be372001-09-20 21:33:42 +00002965if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05002966 unittest.main()