blob: 5ec911ef8a1038abd89111004cb0c4147378ddfe [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import platform
19import re
Victor Stinner47aacc82015-06-12 17:26:23 +020020import shutil
21import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000022import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000032try:
33 import threading
34except ImportError:
35 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import grp
50 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
51 if hasattr(os, 'getgid'):
52 process_gid = os.getgid()
53 if process_gid not in groups:
54 groups.append(process_gid)
55except ImportError:
56 groups = []
57try:
58 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
60except ImportError:
61 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Berker Peksagce643912015-05-06 06:33:17 +030067from test.support.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Thomas Wouters0e3f5912006-08-11 14:57:12 +000085# Tests creating TESTFN
86class FileTests(unittest.TestCase):
87 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000088 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000089 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000090 tearDown = setUp
91
92 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000093 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000095 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096
Christian Heimesfdab48e2008-01-20 09:06:41 +000097 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000098 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
99 # We must allocate two consecutive file descriptors, otherwise
100 # it will mess up other file descriptors (perhaps even the three
101 # standard ones).
102 second = os.dup(first)
103 try:
104 retries = 0
105 while second != first + 1:
106 os.close(first)
107 retries += 1
108 if retries > 10:
109 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000110 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000111 first, second = second, os.dup(second)
112 finally:
113 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000114 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000115 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000116 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000118 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000119 def test_rename(self):
120 path = support.TESTFN
121 old = sys.getrefcount(path)
122 self.assertRaises(TypeError, os.rename, path, 0)
123 new = sys.getrefcount(path)
124 self.assertEqual(old, new)
125
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000126 def test_read(self):
127 with open(support.TESTFN, "w+b") as fobj:
128 fobj.write(b"spam")
129 fobj.flush()
130 fd = fobj.fileno()
131 os.lseek(fd, 0, 0)
132 s = os.read(fd, 4)
133 self.assertEqual(type(s), bytes)
134 self.assertEqual(s, b"spam")
135
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200136 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200137 # Skip the test on 32-bit platforms: the number of bytes must fit in a
138 # Py_ssize_t type
139 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
140 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200141 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
142 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200143 with open(support.TESTFN, "wb") as fp:
144 fp.write(b'test')
145 self.addCleanup(support.unlink, support.TESTFN)
146
147 # Issue #21932: Make sure that os.read() does not raise an
148 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200149 with open(support.TESTFN, "rb") as fp:
150 data = os.read(fp.fileno(), size)
151
152 # The test does not try to read more than 2 GB at once because the
153 # operating system is free to return less bytes than requested.
154 self.assertEqual(data, b'test')
155
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000156 def test_write(self):
157 # os.write() accepts bytes- and buffer-like objects but not strings
158 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
159 self.assertRaises(TypeError, os.write, fd, "beans")
160 os.write(fd, b"bacon\n")
161 os.write(fd, bytearray(b"eggs\n"))
162 os.write(fd, memoryview(b"spam\n"))
163 os.close(fd)
164 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000165 self.assertEqual(fobj.read().splitlines(),
166 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000167
Victor Stinnere0daff12011-03-20 23:36:35 +0100168 def write_windows_console(self, *args):
169 retcode = subprocess.call(args,
170 # use a new console to not flood the test output
171 creationflags=subprocess.CREATE_NEW_CONSOLE,
172 # use a shell to hide the console window (SW_HIDE)
173 shell=True)
174 self.assertEqual(retcode, 0)
175
176 @unittest.skipUnless(sys.platform == 'win32',
177 'test specific to the Windows console')
178 def test_write_windows_console(self):
179 # Issue #11395: the Windows console returns an error (12: not enough
180 # space error) on writing into stdout if stdout mode is binary and the
181 # length is greater than 66,000 bytes (or less, depending on heap
182 # usage).
183 code = "print('x' * 100000)"
184 self.write_windows_console(sys.executable, "-c", code)
185 self.write_windows_console(sys.executable, "-u", "-c", code)
186
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000187 def fdopen_helper(self, *args):
188 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200189 f = os.fdopen(fd, *args)
190 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000191
192 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200193 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
194 os.close(fd)
195
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000196 self.fdopen_helper()
197 self.fdopen_helper('r')
198 self.fdopen_helper('r', 100)
199
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100200 def test_replace(self):
201 TESTFN2 = support.TESTFN + ".2"
202 with open(support.TESTFN, 'w') as f:
203 f.write("1")
204 with open(TESTFN2, 'w') as f:
205 f.write("2")
206 self.addCleanup(os.unlink, TESTFN2)
207 os.replace(support.TESTFN, TESTFN2)
208 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
209 with open(TESTFN2, 'r') as f:
210 self.assertEqual(f.read(), "1")
211
Martin Panterbf19d162015-09-09 01:01:13 +0000212 def test_open_keywords(self):
213 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
214 dir_fd=None)
215 os.close(f)
216
217 def test_symlink_keywords(self):
218 symlink = support.get_attribute(os, "symlink")
219 try:
220 symlink(src='target', dst=support.TESTFN,
221 target_is_directory=False, dir_fd=None)
222 except (NotImplementedError, OSError):
223 pass # No OS support or unprivileged user
224
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200225
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000226# Test attributes on return values from os.*stat* family.
227class StatAttributeTests(unittest.TestCase):
228 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200229 self.fname = support.TESTFN
230 self.addCleanup(support.unlink, self.fname)
231 with open(self.fname, 'wb') as fp:
232 fp.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233
Serhiy Storchaka43767632013-11-03 21:31:38 +0200234 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000235 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000236 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000237
238 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000239 self.assertEqual(result[stat.ST_SIZE], 3)
240 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 # Make sure all the attributes are there
243 members = dir(result)
244 for name in dir(stat):
245 if name[:3] == 'ST_':
246 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000247 if name.endswith("TIME"):
248 def trunc(x): return int(x)
249 else:
250 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000252 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000253 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254
Larry Hastings6fe20b32012-04-19 15:07:49 -0700255 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700256 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700257 for name in 'st_atime st_mtime st_ctime'.split():
258 floaty = int(getattr(result, name) * 100000)
259 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700260 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700261
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000262 try:
263 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200264 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000265 except IndexError:
266 pass
267
268 # Make sure that assignment fails
269 try:
270 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200271 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000272 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273 pass
274
275 try:
276 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200277 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000278 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000279 pass
280
281 try:
282 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200283 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 except AttributeError:
285 pass
286
287 # Use the stat_result constructor with a too-short tuple.
288 try:
289 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200290 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000291 except TypeError:
292 pass
293
Ezio Melotti42da6632011-03-15 05:18:48 +0200294 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 try:
296 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
297 except TypeError:
298 pass
299
Antoine Pitrou38425292010-09-21 18:19:07 +0000300 def test_stat_attributes(self):
301 self.check_stat_attributes(self.fname)
302
303 def test_stat_attributes_bytes(self):
304 try:
305 fname = self.fname.encode(sys.getfilesystemencoding())
306 except UnicodeEncodeError:
307 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100308 with warnings.catch_warnings():
309 warnings.simplefilter("ignore", DeprecationWarning)
310 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000311
Christian Heimes25827622013-10-12 01:27:08 +0200312 def test_stat_result_pickle(self):
313 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200314 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
315 p = pickle.dumps(result, proto)
316 self.assertIn(b'stat_result', p)
317 if proto < 4:
318 self.assertIn(b'cos\nstat_result\n', p)
319 unpickled = pickle.loads(p)
320 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200321
Serhiy Storchaka43767632013-11-03 21:31:38 +0200322 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000323 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000324 try:
325 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000326 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000327 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000328 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200329 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000330
331 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000334 # Make sure all the attributes are there.
335 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
336 'ffree', 'favail', 'flag', 'namemax')
337 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339
340 # Make sure that assignment really fails
341 try:
342 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200343 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000344 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345 pass
346
347 try:
348 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200349 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 except AttributeError:
351 pass
352
353 # Use the constructor with a too-short tuple.
354 try:
355 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200356 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000357 except TypeError:
358 pass
359
Ezio Melotti42da6632011-03-15 05:18:48 +0200360 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361 try:
362 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
363 except TypeError:
364 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000365
Christian Heimes25827622013-10-12 01:27:08 +0200366 @unittest.skipUnless(hasattr(os, 'statvfs'),
367 "need os.statvfs()")
368 def test_statvfs_result_pickle(self):
369 try:
370 result = os.statvfs(self.fname)
371 except OSError as e:
372 # On AtheOS, glibc always returns ENOSYS
373 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200374 self.skipTest('os.statvfs() failed with ENOSYS')
375
Serhiy Storchakabad12572014-12-15 14:03:42 +0200376 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
377 p = pickle.dumps(result, proto)
378 self.assertIn(b'statvfs_result', p)
379 if proto < 4:
380 self.assertIn(b'cos\nstatvfs_result\n', p)
381 unpickled = pickle.loads(p)
382 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200383
Serhiy Storchaka43767632013-11-03 21:31:38 +0200384 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
385 def test_1686475(self):
386 # Verify that an open file can be stat'ed
387 try:
388 os.stat(r"c:\pagefile.sys")
389 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600390 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200391 except OSError as e:
392 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000393
Serhiy Storchaka43767632013-11-03 21:31:38 +0200394 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
395 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
396 def test_15261(self):
397 # Verify that stat'ing a closed fd does not cause crash
398 r, w = os.pipe()
399 try:
400 os.stat(r) # should not raise error
401 finally:
402 os.close(r)
403 os.close(w)
404 with self.assertRaises(OSError) as ctx:
405 os.stat(r)
406 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100407
Zachary Ware63f277b2014-06-19 09:46:37 -0500408 def check_file_attributes(self, result):
409 self.assertTrue(hasattr(result, 'st_file_attributes'))
410 self.assertTrue(isinstance(result.st_file_attributes, int))
411 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
412
413 @unittest.skipUnless(sys.platform == "win32",
414 "st_file_attributes is Win32 specific")
415 def test_file_attributes(self):
416 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
417 result = os.stat(self.fname)
418 self.check_file_attributes(result)
419 self.assertEqual(
420 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
421 0)
422
423 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200424 dirname = support.TESTFN + "dir"
425 os.mkdir(dirname)
426 self.addCleanup(os.rmdir, dirname)
427
428 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500429 self.check_file_attributes(result)
430 self.assertEqual(
431 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
432 stat.FILE_ATTRIBUTE_DIRECTORY)
433
Victor Stinner47aacc82015-06-12 17:26:23 +0200434
435class UtimeTests(unittest.TestCase):
436 def setUp(self):
437 self.dirname = support.TESTFN
438 self.fname = os.path.join(self.dirname, "f1")
439
440 self.addCleanup(support.rmtree, self.dirname)
441 os.mkdir(self.dirname)
442 with open(self.fname, 'wb') as fp:
443 fp.write(b"ABC")
444
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200445 def restore_float_times(state):
446 with warnings.catch_warnings():
447 warnings.simplefilter("ignore", DeprecationWarning)
448
449 os.stat_float_times(state)
450
Victor Stinner47aacc82015-06-12 17:26:23 +0200451 # ensure that st_atime and st_mtime are float
452 with warnings.catch_warnings():
453 warnings.simplefilter("ignore", DeprecationWarning)
454
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200455 old_float_times = os.stat_float_times(-1)
456 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200457
458 os.stat_float_times(True)
459
460 def support_subsecond(self, filename):
461 # Heuristic to check if the filesystem supports timestamp with
462 # subsecond resolution: check if float and int timestamps are different
463 st = os.stat(filename)
464 return ((st.st_atime != st[7])
465 or (st.st_mtime != st[8])
466 or (st.st_ctime != st[9]))
467
468 def _test_utime(self, set_time, filename=None):
469 if not filename:
470 filename = self.fname
471
472 support_subsecond = self.support_subsecond(filename)
473 if support_subsecond:
474 # Timestamp with a resolution of 1 microsecond (10^-6).
475 #
476 # The resolution of the C internal function used by os.utime()
477 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
478 # test with a resolution of 1 ns requires more work:
479 # see the issue #15745.
480 atime_ns = 1002003000 # 1.002003 seconds
481 mtime_ns = 4005006000 # 4.005006 seconds
482 else:
483 # use a resolution of 1 second
484 atime_ns = 5 * 10**9
485 mtime_ns = 8 * 10**9
486
487 set_time(filename, (atime_ns, mtime_ns))
488 st = os.stat(filename)
489
490 if support_subsecond:
491 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
492 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
493 else:
494 self.assertEqual(st.st_atime, atime_ns * 1e-9)
495 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
496 self.assertEqual(st.st_atime_ns, atime_ns)
497 self.assertEqual(st.st_mtime_ns, mtime_ns)
498
499 def test_utime(self):
500 def set_time(filename, ns):
501 # test the ns keyword parameter
502 os.utime(filename, ns=ns)
503 self._test_utime(set_time)
504
505 @staticmethod
506 def ns_to_sec(ns):
507 # Convert a number of nanosecond (int) to a number of seconds (float).
508 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
509 # issue, os.utime() rounds towards minus infinity.
510 return (ns * 1e-9) + 0.5e-9
511
512 def test_utime_by_indexed(self):
513 # pass times as floating point seconds as the second indexed parameter
514 def set_time(filename, ns):
515 atime_ns, mtime_ns = ns
516 atime = self.ns_to_sec(atime_ns)
517 mtime = self.ns_to_sec(mtime_ns)
518 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
519 # or utime(time_t)
520 os.utime(filename, (atime, mtime))
521 self._test_utime(set_time)
522
523 def test_utime_by_times(self):
524 def set_time(filename, ns):
525 atime_ns, mtime_ns = ns
526 atime = self.ns_to_sec(atime_ns)
527 mtime = self.ns_to_sec(mtime_ns)
528 # test the times keyword parameter
529 os.utime(filename, times=(atime, mtime))
530 self._test_utime(set_time)
531
532 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
533 "follow_symlinks support for utime required "
534 "for this test.")
535 def test_utime_nofollow_symlinks(self):
536 def set_time(filename, ns):
537 # use follow_symlinks=False to test utimensat(timespec)
538 # or lutimes(timeval)
539 os.utime(filename, ns=ns, follow_symlinks=False)
540 self._test_utime(set_time)
541
542 @unittest.skipUnless(os.utime in os.supports_fd,
543 "fd support for utime required for this test.")
544 def test_utime_fd(self):
545 def set_time(filename, ns):
546 with open(filename, 'wb') as fp:
547 # use a file descriptor to test futimens(timespec)
548 # or futimes(timeval)
549 os.utime(fp.fileno(), ns=ns)
550 self._test_utime(set_time)
551
552 @unittest.skipUnless(os.utime in os.supports_dir_fd,
553 "dir_fd support for utime required for this test.")
554 def test_utime_dir_fd(self):
555 def set_time(filename, ns):
556 dirname, name = os.path.split(filename)
557 dirfd = os.open(dirname, os.O_RDONLY)
558 try:
559 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
560 os.utime(name, dir_fd=dirfd, ns=ns)
561 finally:
562 os.close(dirfd)
563 self._test_utime(set_time)
564
565 def test_utime_directory(self):
566 def set_time(filename, ns):
567 # test calling os.utime() on a directory
568 os.utime(filename, ns=ns)
569 self._test_utime(set_time, filename=self.dirname)
570
571 def _test_utime_current(self, set_time):
572 # Get the system clock
573 current = time.time()
574
575 # Call os.utime() to set the timestamp to the current system clock
576 set_time(self.fname)
577
578 if not self.support_subsecond(self.fname):
579 delta = 1.0
580 else:
581 # On Windows, the usual resolution of time.time() is 15.6 ms
582 delta = 0.020
583 st = os.stat(self.fname)
584 msg = ("st_time=%r, current=%r, dt=%r"
585 % (st.st_mtime, current, st.st_mtime - current))
586 self.assertAlmostEqual(st.st_mtime, current,
587 delta=delta, msg=msg)
588
589 def test_utime_current(self):
590 def set_time(filename):
591 # Set to the current time in the new way
592 os.utime(self.fname)
593 self._test_utime_current(set_time)
594
595 def test_utime_current_old(self):
596 def set_time(filename):
597 # Set to the current time in the old explicit way.
598 os.utime(self.fname, None)
599 self._test_utime_current(set_time)
600
601 def get_file_system(self, path):
602 if sys.platform == 'win32':
603 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
604 import ctypes
605 kernel32 = ctypes.windll.kernel32
606 buf = ctypes.create_unicode_buffer("", 100)
607 ok = kernel32.GetVolumeInformationW(root, None, 0,
608 None, None, None,
609 buf, len(buf))
610 if ok:
611 return buf.value
612 # return None if the filesystem is unknown
613
614 def test_large_time(self):
615 # Many filesystems are limited to the year 2038. At least, the test
616 # pass with NTFS filesystem.
617 if self.get_file_system(self.dirname) != "NTFS":
618 self.skipTest("requires NTFS")
619
620 large = 5000000000 # some day in 2128
621 os.utime(self.fname, (large, large))
622 self.assertEqual(os.stat(self.fname).st_mtime, large)
623
624 def test_utime_invalid_arguments(self):
625 # seconds and nanoseconds parameters are mutually exclusive
626 with self.assertRaises(ValueError):
627 os.utime(self.fname, (5, 5), ns=(5, 5))
628
629
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000630from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000631
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000632class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000633 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000634 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000635
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000636 def setUp(self):
637 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000638 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000639 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000640 for key, value in self._reference().items():
641 os.environ[key] = value
642
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000643 def tearDown(self):
644 os.environ.clear()
645 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000646 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000647 os.environb.clear()
648 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000649
Christian Heimes90333392007-11-01 19:08:42 +0000650 def _reference(self):
651 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
652
653 def _empty_mapping(self):
654 os.environ.clear()
655 return os.environ
656
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000657 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300658 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000659 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000660 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300661 os.environ.update(HELLO="World")
662 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
663 value = popen.read().strip()
664 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000665
Ezio Melottic7e139b2012-09-26 20:01:34 +0300666 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000667 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300668 with os.popen(
669 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
670 it = iter(popen)
671 self.assertEqual(next(it), "line1\n")
672 self.assertEqual(next(it), "line2\n")
673 self.assertEqual(next(it), "line3\n")
674 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000675
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000676 # Verify environ keys and values from the OS are of the
677 # correct str type.
678 def test_keyvalue_types(self):
679 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000680 self.assertEqual(type(key), str)
681 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000682
Christian Heimes90333392007-11-01 19:08:42 +0000683 def test_items(self):
684 for key, value in self._reference().items():
685 self.assertEqual(os.environ.get(key), value)
686
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000687 # Issue 7310
688 def test___repr__(self):
689 """Check that the repr() of os.environ looks like environ({...})."""
690 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000691 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
692 '{!r}: {!r}'.format(key, value)
693 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000694
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000695 def test_get_exec_path(self):
696 defpath_list = os.defpath.split(os.pathsep)
697 test_path = ['/monty', '/python', '', '/flying/circus']
698 test_env = {'PATH': os.pathsep.join(test_path)}
699
700 saved_environ = os.environ
701 try:
702 os.environ = dict(test_env)
703 # Test that defaulting to os.environ works.
704 self.assertSequenceEqual(test_path, os.get_exec_path())
705 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
706 finally:
707 os.environ = saved_environ
708
709 # No PATH environment variable
710 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
711 # Empty PATH environment variable
712 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
713 # Supplied PATH environment variable
714 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
715
Victor Stinnerb745a742010-05-18 17:17:23 +0000716 if os.supports_bytes_environ:
717 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000718 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000719 # ignore BytesWarning warning
720 with warnings.catch_warnings(record=True):
721 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000722 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000723 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000724 pass
725 else:
726 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000727
728 # bytes key and/or value
729 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
730 ['abc'])
731 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
732 ['abc'])
733 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
734 ['abc'])
735
736 @unittest.skipUnless(os.supports_bytes_environ,
737 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000738 def test_environb(self):
739 # os.environ -> os.environb
740 value = 'euro\u20ac'
741 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000742 value_bytes = value.encode(sys.getfilesystemencoding(),
743 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000744 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000745 msg = "U+20AC character is not encodable to %s" % (
746 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000747 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000748 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000749 self.assertEqual(os.environ['unicode'], value)
750 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000751
752 # os.environb -> os.environ
753 value = b'\xff'
754 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000755 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000756 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000757 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000758
Charles-François Natali2966f102011-11-26 11:32:46 +0100759 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
760 # #13415).
761 @support.requires_freebsd_version(7)
762 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100763 def test_unset_error(self):
764 if sys.platform == "win32":
765 # an environment variable is limited to 32,767 characters
766 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100767 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100768 else:
769 # "=" is not allowed in a variable name
770 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100771 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100772
Victor Stinner6d101392013-04-14 16:35:04 +0200773 def test_key_type(self):
774 missing = 'missingkey'
775 self.assertNotIn(missing, os.environ)
776
Victor Stinner839e5ea2013-04-14 16:43:03 +0200777 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200778 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200779 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200780 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200781
Victor Stinner839e5ea2013-04-14 16:43:03 +0200782 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200783 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200784 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200785 self.assertTrue(cm.exception.__suppress_context__)
786
Victor Stinner6d101392013-04-14 16:35:04 +0200787
Tim Petersc4e09402003-04-25 07:11:48 +0000788class WalkTests(unittest.TestCase):
789 """Tests for os.walk()."""
790
Victor Stinner0561c532015-03-12 10:28:24 +0100791 # Wrapper to hide minor differences between os.walk and os.fwalk
792 # to tests both functions with the same code base
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200793 def walk(self, directory, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200794 if 'follow_symlinks' in kwargs:
795 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200796 return os.walk(directory, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100797
Charles-François Natali7372b062012-02-05 15:15:38 +0100798 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100799 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000800
801 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000802 # TESTFN/
803 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000804 # tmp1
805 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000806 # tmp2
807 # SUB11/ no kids
808 # SUB2/ a file kid and a dirsymlink kid
809 # tmp3
810 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200811 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000812 # TEST2/
813 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100814 self.walk_path = join(support.TESTFN, "TEST1")
815 self.sub1_path = join(self.walk_path, "SUB1")
816 self.sub11_path = join(self.sub1_path, "SUB11")
817 sub2_path = join(self.walk_path, "SUB2")
818 tmp1_path = join(self.walk_path, "tmp1")
819 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000820 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100821 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000822 t2_path = join(support.TESTFN, "TEST2")
823 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200824 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000825
826 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100827 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000828 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000829 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100830
Guido van Rossumd8faa362007-04-27 19:54:29 +0000831 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000832 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000833 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
834 f.close()
835
Victor Stinner0561c532015-03-12 10:28:24 +0100836 if support.can_symlink():
837 os.symlink(os.path.abspath(t2_path), self.link_path)
838 os.symlink('broken', broken_link_path, True)
839 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
840 else:
841 self.sub2_tree = (sub2_path, [], ["tmp3"])
842
843 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000844 # Walk top-down.
Victor Stinner0561c532015-03-12 10:28:24 +0100845 all = list(os.walk(self.walk_path))
846
Tim Petersc4e09402003-04-25 07:11:48 +0000847 self.assertEqual(len(all), 4)
848 # We can't know which order SUB1 and SUB2 will appear in.
849 # Not flipped: TESTFN, SUB1, SUB11, SUB2
850 # flipped: TESTFN, SUB2, SUB1, SUB11
851 flipped = all[0][1][0] != "SUB1"
852 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200853 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100854 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
855 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
856 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
857 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000858
Victor Stinner0561c532015-03-12 10:28:24 +0100859 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000860 # Prune the search.
861 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100862 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000863 all.append((root, dirs, files))
864 # Don't descend into SUB1.
865 if 'SUB1' in dirs:
866 # Note that this also mutates the dirs we appended to all!
867 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000868
Victor Stinner0561c532015-03-12 10:28:24 +0100869 self.assertEqual(len(all), 2)
870 self.assertEqual(all[0],
871 (self.walk_path, ["SUB2"], ["tmp1"]))
872
873 all[1][-1].sort()
874 self.assertEqual(all[1], self.sub2_tree)
875
876 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000877 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100878 all = list(self.walk(self.walk_path, topdown=False))
879
Tim Petersc4e09402003-04-25 07:11:48 +0000880 self.assertEqual(len(all), 4)
881 # We can't know which order SUB1 and SUB2 will appear in.
882 # Not flipped: SUB11, SUB1, SUB2, TESTFN
883 # flipped: SUB2, SUB11, SUB1, TESTFN
884 flipped = all[3][1][0] != "SUB1"
885 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200886 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100887 self.assertEqual(all[3],
888 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
889 self.assertEqual(all[flipped],
890 (self.sub11_path, [], []))
891 self.assertEqual(all[flipped + 1],
892 (self.sub1_path, ["SUB11"], ["tmp2"]))
893 self.assertEqual(all[2 - 2 * flipped],
894 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000895
Victor Stinner0561c532015-03-12 10:28:24 +0100896 def test_walk_symlink(self):
897 if not support.can_symlink():
898 self.skipTest("need symlink support")
899
900 # Walk, following symlinks.
901 walk_it = self.walk(self.walk_path, follow_symlinks=True)
902 for root, dirs, files in walk_it:
903 if root == self.link_path:
904 self.assertEqual(dirs, [])
905 self.assertEqual(files, ["tmp4"])
906 break
907 else:
908 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000909
910 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000911 # Tear everything down. This is a decent use for bottom-up on
912 # Windows, which doesn't have a recursive delete command. The
913 # (not so) subtlety is that rmdir will fail unless the dir's
914 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000915 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000916 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000917 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000918 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000919 dirname = os.path.join(root, name)
920 if not os.path.islink(dirname):
921 os.rmdir(dirname)
922 else:
923 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000924 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000925
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200926 def test_walk_bad_dir(self):
927 # Walk top-down.
928 errors = []
929 walk_it = self.walk(self.walk_path, onerror=errors.append)
930 root, dirs, files = next(walk_it)
931 self.assertFalse(errors)
932 dir1 = dirs[0]
933 dir1new = dir1 + '.new'
934 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
935 roots = [r for r, d, f in walk_it]
936 self.assertTrue(errors)
937 self.assertNotIn(os.path.join(root, dir1), roots)
938 self.assertNotIn(os.path.join(root, dir1new), roots)
939 for dir2 in dirs[1:]:
940 self.assertIn(os.path.join(root, dir2), roots)
941
Charles-François Natali7372b062012-02-05 15:15:38 +0100942
943@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
944class FwalkTests(WalkTests):
945 """Tests for os.fwalk()."""
946
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200947 def walk(self, directory, **kwargs):
948 for root, dirs, files, root_fd in os.fwalk(directory, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +0100949 yield (root, dirs, files)
950
951
Larry Hastingsc48fe982012-06-25 04:49:05 -0700952 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
953 """
954 compare with walk() results.
955 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700956 walk_kwargs = walk_kwargs.copy()
957 fwalk_kwargs = fwalk_kwargs.copy()
958 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
959 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
960 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700961
Charles-François Natali7372b062012-02-05 15:15:38 +0100962 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700963 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100964 expected[root] = (set(dirs), set(files))
965
Larry Hastingsc48fe982012-06-25 04:49:05 -0700966 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100967 self.assertIn(root, expected)
968 self.assertEqual(expected[root], (set(dirs), set(files)))
969
Larry Hastingsc48fe982012-06-25 04:49:05 -0700970 def test_compare_to_walk(self):
971 kwargs = {'top': support.TESTFN}
972 self._compare_to_walk(kwargs, kwargs)
973
Charles-François Natali7372b062012-02-05 15:15:38 +0100974 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700975 try:
976 fd = os.open(".", os.O_RDONLY)
977 walk_kwargs = {'top': support.TESTFN}
978 fwalk_kwargs = walk_kwargs.copy()
979 fwalk_kwargs['dir_fd'] = fd
980 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
981 finally:
982 os.close(fd)
983
984 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100985 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700986 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
987 args = support.TESTFN, topdown, None
988 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100989 # check that the FD is valid
990 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700991 # redundant check
992 os.stat(rootfd)
993 # check that listdir() returns consistent information
994 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100995
996 def test_fd_leak(self):
997 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
998 # we both check that calling fwalk() a large number of times doesn't
999 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1000 minfd = os.dup(1)
1001 os.close(minfd)
1002 for i in range(256):
1003 for x in os.fwalk(support.TESTFN):
1004 pass
1005 newfd = os.dup(1)
1006 self.addCleanup(os.close, newfd)
1007 self.assertEqual(newfd, minfd)
1008
1009 def tearDown(self):
1010 # cleanup
1011 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
1012 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001013 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001014 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001015 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -07001016 if stat.S_ISDIR(st.st_mode):
1017 os.rmdir(name, dir_fd=rootfd)
1018 else:
1019 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001020 os.rmdir(support.TESTFN)
1021
1022
Guido van Rossume7ba4952007-06-06 23:52:48 +00001023class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001024 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001025 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001026
1027 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001028 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001029 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1030 os.makedirs(path) # Should work
1031 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1032 os.makedirs(path)
1033
1034 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001035 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001036 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1037 os.makedirs(path)
1038 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1039 'dir5', 'dir6')
1040 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001041
Terry Reedy5a22b652010-12-02 07:05:56 +00001042 def test_exist_ok_existing_directory(self):
1043 path = os.path.join(support.TESTFN, 'dir1')
1044 mode = 0o777
1045 old_mask = os.umask(0o022)
1046 os.makedirs(path, mode)
1047 self.assertRaises(OSError, os.makedirs, path, mode)
1048 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001049 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001050 os.makedirs(path, mode=mode, exist_ok=True)
1051 os.umask(old_mask)
1052
Martin Pantera82642f2015-11-19 04:48:44 +00001053 # Issue #25583: A drive root could raise PermissionError on Windows
1054 os.makedirs(os.path.abspath('/'), exist_ok=True)
1055
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001056 def test_exist_ok_s_isgid_directory(self):
1057 path = os.path.join(support.TESTFN, 'dir1')
1058 S_ISGID = stat.S_ISGID
1059 mode = 0o777
1060 old_mask = os.umask(0o022)
1061 try:
1062 existing_testfn_mode = stat.S_IMODE(
1063 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001064 try:
1065 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001066 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001067 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001068 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1069 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1070 # The os should apply S_ISGID from the parent dir for us, but
1071 # this test need not depend on that behavior. Be explicit.
1072 os.makedirs(path, mode | S_ISGID)
1073 # http://bugs.python.org/issue14992
1074 # Should not fail when the bit is already set.
1075 os.makedirs(path, mode, exist_ok=True)
1076 # remove the bit.
1077 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001078 # May work even when the bit is not already set when demanded.
1079 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001080 finally:
1081 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001082
1083 def test_exist_ok_existing_regular_file(self):
1084 base = support.TESTFN
1085 path = os.path.join(support.TESTFN, 'dir1')
1086 f = open(path, 'w')
1087 f.write('abc')
1088 f.close()
1089 self.assertRaises(OSError, os.makedirs, path)
1090 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1091 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1092 os.remove(path)
1093
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001094 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001095 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001096 'dir4', 'dir5', 'dir6')
1097 # If the tests failed, the bottom-most directory ('../dir6')
1098 # may not have been created, so we look for the outermost directory
1099 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001100 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001101 path = os.path.dirname(path)
1102
1103 os.removedirs(path)
1104
Andrew Svetlov405faed2012-12-25 12:18:09 +02001105
R David Murrayf2ad1732014-12-25 18:36:56 -05001106@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1107class ChownFileTests(unittest.TestCase):
1108
Berker Peksag036a71b2015-07-21 09:29:48 +03001109 @classmethod
1110 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001111 os.mkdir(support.TESTFN)
1112
1113 def test_chown_uid_gid_arguments_must_be_index(self):
1114 stat = os.stat(support.TESTFN)
1115 uid = stat.st_uid
1116 gid = stat.st_gid
1117 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1118 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1119 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1120 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1121 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1122
1123 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1124 def test_chown(self):
1125 gid_1, gid_2 = groups[:2]
1126 uid = os.stat(support.TESTFN).st_uid
1127 os.chown(support.TESTFN, uid, gid_1)
1128 gid = os.stat(support.TESTFN).st_gid
1129 self.assertEqual(gid, gid_1)
1130 os.chown(support.TESTFN, uid, gid_2)
1131 gid = os.stat(support.TESTFN).st_gid
1132 self.assertEqual(gid, gid_2)
1133
1134 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1135 "test needs root privilege and more than one user")
1136 def test_chown_with_root(self):
1137 uid_1, uid_2 = all_users[:2]
1138 gid = os.stat(support.TESTFN).st_gid
1139 os.chown(support.TESTFN, uid_1, gid)
1140 uid = os.stat(support.TESTFN).st_uid
1141 self.assertEqual(uid, uid_1)
1142 os.chown(support.TESTFN, uid_2, gid)
1143 uid = os.stat(support.TESTFN).st_uid
1144 self.assertEqual(uid, uid_2)
1145
1146 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1147 "test needs non-root account and more than one user")
1148 def test_chown_without_permission(self):
1149 uid_1, uid_2 = all_users[:2]
1150 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001151 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001152 os.chown(support.TESTFN, uid_1, gid)
1153 os.chown(support.TESTFN, uid_2, gid)
1154
Berker Peksag036a71b2015-07-21 09:29:48 +03001155 @classmethod
1156 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001157 os.rmdir(support.TESTFN)
1158
1159
Andrew Svetlov405faed2012-12-25 12:18:09 +02001160class RemoveDirsTests(unittest.TestCase):
1161 def setUp(self):
1162 os.makedirs(support.TESTFN)
1163
1164 def tearDown(self):
1165 support.rmtree(support.TESTFN)
1166
1167 def test_remove_all(self):
1168 dira = os.path.join(support.TESTFN, 'dira')
1169 os.mkdir(dira)
1170 dirb = os.path.join(dira, 'dirb')
1171 os.mkdir(dirb)
1172 os.removedirs(dirb)
1173 self.assertFalse(os.path.exists(dirb))
1174 self.assertFalse(os.path.exists(dira))
1175 self.assertFalse(os.path.exists(support.TESTFN))
1176
1177 def test_remove_partial(self):
1178 dira = os.path.join(support.TESTFN, 'dira')
1179 os.mkdir(dira)
1180 dirb = os.path.join(dira, 'dirb')
1181 os.mkdir(dirb)
1182 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1183 f.write('text')
1184 os.removedirs(dirb)
1185 self.assertFalse(os.path.exists(dirb))
1186 self.assertTrue(os.path.exists(dira))
1187 self.assertTrue(os.path.exists(support.TESTFN))
1188
1189 def test_remove_nothing(self):
1190 dira = os.path.join(support.TESTFN, 'dira')
1191 os.mkdir(dira)
1192 dirb = os.path.join(dira, 'dirb')
1193 os.mkdir(dirb)
1194 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1195 f.write('text')
1196 with self.assertRaises(OSError):
1197 os.removedirs(dirb)
1198 self.assertTrue(os.path.exists(dirb))
1199 self.assertTrue(os.path.exists(dira))
1200 self.assertTrue(os.path.exists(support.TESTFN))
1201
1202
Guido van Rossume7ba4952007-06-06 23:52:48 +00001203class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001204 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001205 with open(os.devnull, 'wb') as f:
1206 f.write(b'hello')
1207 f.close()
1208 with open(os.devnull, 'rb') as f:
1209 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001210
Andrew Svetlov405faed2012-12-25 12:18:09 +02001211
Guido van Rossume7ba4952007-06-06 23:52:48 +00001212class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001213 def test_urandom_length(self):
1214 self.assertEqual(len(os.urandom(0)), 0)
1215 self.assertEqual(len(os.urandom(1)), 1)
1216 self.assertEqual(len(os.urandom(10)), 10)
1217 self.assertEqual(len(os.urandom(100)), 100)
1218 self.assertEqual(len(os.urandom(1000)), 1000)
1219
1220 def test_urandom_value(self):
1221 data1 = os.urandom(16)
1222 data2 = os.urandom(16)
1223 self.assertNotEqual(data1, data2)
1224
1225 def get_urandom_subprocess(self, count):
1226 code = '\n'.join((
1227 'import os, sys',
1228 'data = os.urandom(%s)' % count,
1229 'sys.stdout.buffer.write(data)',
1230 'sys.stdout.buffer.flush()'))
1231 out = assert_python_ok('-c', code)
1232 stdout = out[1]
1233 self.assertEqual(len(stdout), 16)
1234 return stdout
1235
1236 def test_urandom_subprocess(self):
1237 data1 = self.get_urandom_subprocess(16)
1238 data2 = self.get_urandom_subprocess(16)
1239 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001240
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001241
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001242# os.urandom() doesn't use a file descriptor when it is implemented with the
1243# getentropy() function, the getrandom() function or the getrandom() syscall
1244OS_URANDOM_DONT_USE_FD = (
1245 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1246 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1247 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001248
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001249@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1250 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001251class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001252 @unittest.skipUnless(resource, "test requires the resource module")
1253 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001254 # Check urandom() failing when it is not able to open /dev/random.
1255 # We spawn a new process to make the test more robust (if getrlimit()
1256 # failed to restore the file descriptor limit after this, the whole
1257 # test suite would crash; this actually happened on the OS X Tiger
1258 # buildbot).
1259 code = """if 1:
1260 import errno
1261 import os
1262 import resource
1263
1264 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1265 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1266 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001267 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001268 except OSError as e:
1269 assert e.errno == errno.EMFILE, e.errno
1270 else:
1271 raise AssertionError("OSError not raised")
1272 """
1273 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001274
Antoine Pitroue472aea2014-04-26 14:33:03 +02001275 def test_urandom_fd_closed(self):
1276 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1277 # closed.
1278 code = """if 1:
1279 import os
1280 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001281 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001282 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001283 with test.support.SuppressCrashReport():
1284 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001285 sys.stdout.buffer.write(os.urandom(4))
1286 """
1287 rc, out, err = assert_python_ok('-Sc', code)
1288
1289 def test_urandom_fd_reopened(self):
1290 # Issue #21207: urandom() should detect its fd to /dev/urandom
1291 # changed to something else, and reopen it.
1292 with open(support.TESTFN, 'wb') as f:
1293 f.write(b"x" * 256)
1294 self.addCleanup(os.unlink, support.TESTFN)
1295 code = """if 1:
1296 import os
1297 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001298 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001299 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001300 with test.support.SuppressCrashReport():
1301 for fd in range(3, 256):
1302 try:
1303 os.close(fd)
1304 except OSError:
1305 pass
1306 else:
1307 # Found the urandom fd (XXX hopefully)
1308 break
1309 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001310 with open({TESTFN!r}, 'rb') as f:
1311 os.dup2(f.fileno(), fd)
1312 sys.stdout.buffer.write(os.urandom(4))
1313 sys.stdout.buffer.write(os.urandom(4))
1314 """.format(TESTFN=support.TESTFN)
1315 rc, out, err = assert_python_ok('-Sc', code)
1316 self.assertEqual(len(out), 8)
1317 self.assertNotEqual(out[0:4], out[4:8])
1318 rc, out2, err2 = assert_python_ok('-Sc', code)
1319 self.assertEqual(len(out2), 8)
1320 self.assertNotEqual(out2, out)
1321
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001322
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001323@contextlib.contextmanager
1324def _execvpe_mockup(defpath=None):
1325 """
1326 Stubs out execv and execve functions when used as context manager.
1327 Records exec calls. The mock execv and execve functions always raise an
1328 exception as they would normally never return.
1329 """
1330 # A list of tuples containing (function name, first arg, args)
1331 # of calls to execv or execve that have been made.
1332 calls = []
1333
1334 def mock_execv(name, *args):
1335 calls.append(('execv', name, args))
1336 raise RuntimeError("execv called")
1337
1338 def mock_execve(name, *args):
1339 calls.append(('execve', name, args))
1340 raise OSError(errno.ENOTDIR, "execve called")
1341
1342 try:
1343 orig_execv = os.execv
1344 orig_execve = os.execve
1345 orig_defpath = os.defpath
1346 os.execv = mock_execv
1347 os.execve = mock_execve
1348 if defpath is not None:
1349 os.defpath = defpath
1350 yield calls
1351 finally:
1352 os.execv = orig_execv
1353 os.execve = orig_execve
1354 os.defpath = orig_defpath
1355
Guido van Rossume7ba4952007-06-06 23:52:48 +00001356class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001357 @unittest.skipIf(USING_LINUXTHREADS,
1358 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001359 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001360 self.assertRaises(OSError, os.execvpe, 'no such app-',
1361 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001362
Thomas Heller6790d602007-08-30 17:15:14 +00001363 def test_execvpe_with_bad_arglist(self):
1364 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1365
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001366 @unittest.skipUnless(hasattr(os, '_execvpe'),
1367 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001368 def _test_internal_execvpe(self, test_type):
1369 program_path = os.sep + 'absolutepath'
1370 if test_type is bytes:
1371 program = b'executable'
1372 fullpath = os.path.join(os.fsencode(program_path), program)
1373 native_fullpath = fullpath
1374 arguments = [b'progname', 'arg1', 'arg2']
1375 else:
1376 program = 'executable'
1377 arguments = ['progname', 'arg1', 'arg2']
1378 fullpath = os.path.join(program_path, program)
1379 if os.name != "nt":
1380 native_fullpath = os.fsencode(fullpath)
1381 else:
1382 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001383 env = {'spam': 'beans'}
1384
Victor Stinnerb745a742010-05-18 17:17:23 +00001385 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001386 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001387 self.assertRaises(RuntimeError,
1388 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001389 self.assertEqual(len(calls), 1)
1390 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1391
Victor Stinnerb745a742010-05-18 17:17:23 +00001392 # test os._execvpe() with a relative path:
1393 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001394 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001395 self.assertRaises(OSError,
1396 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001397 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001398 self.assertSequenceEqual(calls[0],
1399 ('execve', native_fullpath, (arguments, env)))
1400
1401 # test os._execvpe() with a relative path:
1402 # os.get_exec_path() reads the 'PATH' variable
1403 with _execvpe_mockup() as calls:
1404 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001405 if test_type is bytes:
1406 env_path[b'PATH'] = program_path
1407 else:
1408 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001409 self.assertRaises(OSError,
1410 os._execvpe, program, arguments, env=env_path)
1411 self.assertEqual(len(calls), 1)
1412 self.assertSequenceEqual(calls[0],
1413 ('execve', native_fullpath, (arguments, env_path)))
1414
1415 def test_internal_execvpe_str(self):
1416 self._test_internal_execvpe(str)
1417 if os.name != "nt":
1418 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001419
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001420
Serhiy Storchaka43767632013-11-03 21:31:38 +02001421@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001422class Win32ErrorTests(unittest.TestCase):
1423 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001424 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001425
1426 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001427 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001428
1429 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001430 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001431
1432 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001433 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001434 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001435 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001436 finally:
1437 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001438 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001439
1440 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001441 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001442
Thomas Wouters477c8d52006-05-27 19:21:47 +00001443 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001444 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001445
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001446class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001447 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001448 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1449 #singles.append("close")
1450 #We omit close because it doesn'r raise an exception on some platforms
1451 def get_single(f):
1452 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001453 if hasattr(os, f):
1454 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001455 return helper
1456 for f in singles:
1457 locals()["test_"+f] = get_single(f)
1458
Benjamin Peterson7522c742009-01-19 21:00:09 +00001459 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001460 try:
1461 f(support.make_bad_fd(), *args)
1462 except OSError as e:
1463 self.assertEqual(e.errno, errno.EBADF)
1464 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001465 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001466 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001467
Serhiy Storchaka43767632013-11-03 21:31:38 +02001468 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001469 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001470 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001471
Serhiy Storchaka43767632013-11-03 21:31:38 +02001472 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001473 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001474 fd = support.make_bad_fd()
1475 # Make sure none of the descriptors we are about to close are
1476 # currently valid (issue 6542).
1477 for i in range(10):
1478 try: os.fstat(fd+i)
1479 except OSError:
1480 pass
1481 else:
1482 break
1483 if i < 2:
1484 raise unittest.SkipTest(
1485 "Unable to acquire a range of invalid file descriptors")
1486 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001487
Serhiy Storchaka43767632013-11-03 21:31:38 +02001488 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001489 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001490 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001491
Serhiy Storchaka43767632013-11-03 21:31:38 +02001492 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001493 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001494 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001495
Serhiy Storchaka43767632013-11-03 21:31:38 +02001496 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001497 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001498 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001499
Serhiy Storchaka43767632013-11-03 21:31:38 +02001500 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001501 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001502 self.check(os.pathconf, "PC_NAME_MAX")
1503 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001504
Serhiy Storchaka43767632013-11-03 21:31:38 +02001505 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001506 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001507 self.check(os.truncate, 0)
1508 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001509
Serhiy Storchaka43767632013-11-03 21:31:38 +02001510 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001511 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001512 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001513
Serhiy Storchaka43767632013-11-03 21:31:38 +02001514 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001515 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001516 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001517
Victor Stinner57ddf782014-01-08 15:21:28 +01001518 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1519 def test_readv(self):
1520 buf = bytearray(10)
1521 self.check(os.readv, [buf])
1522
Serhiy Storchaka43767632013-11-03 21:31:38 +02001523 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001524 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001525 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001526
Serhiy Storchaka43767632013-11-03 21:31:38 +02001527 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001528 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001529 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001530
Victor Stinner57ddf782014-01-08 15:21:28 +01001531 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1532 def test_writev(self):
1533 self.check(os.writev, [b'abc'])
1534
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001535 def test_inheritable(self):
1536 self.check(os.get_inheritable)
1537 self.check(os.set_inheritable, True)
1538
1539 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1540 'needs os.get_blocking() and os.set_blocking()')
1541 def test_blocking(self):
1542 self.check(os.get_blocking)
1543 self.check(os.set_blocking, True)
1544
Brian Curtin1b9df392010-11-24 20:24:31 +00001545
1546class LinkTests(unittest.TestCase):
1547 def setUp(self):
1548 self.file1 = support.TESTFN
1549 self.file2 = os.path.join(support.TESTFN + "2")
1550
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001551 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001552 for file in (self.file1, self.file2):
1553 if os.path.exists(file):
1554 os.unlink(file)
1555
Brian Curtin1b9df392010-11-24 20:24:31 +00001556 def _test_link(self, file1, file2):
1557 with open(file1, "w") as f1:
1558 f1.write("test")
1559
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001560 with warnings.catch_warnings():
1561 warnings.simplefilter("ignore", DeprecationWarning)
1562 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001563 with open(file1, "r") as f1, open(file2, "r") as f2:
1564 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1565
1566 def test_link(self):
1567 self._test_link(self.file1, self.file2)
1568
1569 def test_link_bytes(self):
1570 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1571 bytes(self.file2, sys.getfilesystemencoding()))
1572
Brian Curtinf498b752010-11-30 15:54:04 +00001573 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001574 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001575 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001576 except UnicodeError:
1577 raise unittest.SkipTest("Unable to encode for this platform.")
1578
Brian Curtinf498b752010-11-30 15:54:04 +00001579 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001580 self.file2 = self.file1 + "2"
1581 self._test_link(self.file1, self.file2)
1582
Serhiy Storchaka43767632013-11-03 21:31:38 +02001583@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1584class PosixUidGidTests(unittest.TestCase):
1585 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1586 def test_setuid(self):
1587 if os.getuid() != 0:
1588 self.assertRaises(OSError, os.setuid, 0)
1589 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001590
Serhiy Storchaka43767632013-11-03 21:31:38 +02001591 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1592 def test_setgid(self):
1593 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1594 self.assertRaises(OSError, os.setgid, 0)
1595 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001596
Serhiy Storchaka43767632013-11-03 21:31:38 +02001597 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1598 def test_seteuid(self):
1599 if os.getuid() != 0:
1600 self.assertRaises(OSError, os.seteuid, 0)
1601 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001602
Serhiy Storchaka43767632013-11-03 21:31:38 +02001603 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1604 def test_setegid(self):
1605 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1606 self.assertRaises(OSError, os.setegid, 0)
1607 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001608
Serhiy Storchaka43767632013-11-03 21:31:38 +02001609 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1610 def test_setreuid(self):
1611 if os.getuid() != 0:
1612 self.assertRaises(OSError, os.setreuid, 0, 0)
1613 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1614 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001615
Serhiy Storchaka43767632013-11-03 21:31:38 +02001616 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1617 def test_setreuid_neg1(self):
1618 # Needs to accept -1. We run this in a subprocess to avoid
1619 # altering the test runner's process state (issue8045).
1620 subprocess.check_call([
1621 sys.executable, '-c',
1622 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001623
Serhiy Storchaka43767632013-11-03 21:31:38 +02001624 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1625 def test_setregid(self):
1626 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1627 self.assertRaises(OSError, os.setregid, 0, 0)
1628 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1629 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001630
Serhiy Storchaka43767632013-11-03 21:31:38 +02001631 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1632 def test_setregid_neg1(self):
1633 # Needs to accept -1. We run this in a subprocess to avoid
1634 # altering the test runner's process state (issue8045).
1635 subprocess.check_call([
1636 sys.executable, '-c',
1637 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001638
Serhiy Storchaka43767632013-11-03 21:31:38 +02001639@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1640class Pep383Tests(unittest.TestCase):
1641 def setUp(self):
1642 if support.TESTFN_UNENCODABLE:
1643 self.dir = support.TESTFN_UNENCODABLE
1644 elif support.TESTFN_NONASCII:
1645 self.dir = support.TESTFN_NONASCII
1646 else:
1647 self.dir = support.TESTFN
1648 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001649
Serhiy Storchaka43767632013-11-03 21:31:38 +02001650 bytesfn = []
1651 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001652 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001653 fn = os.fsencode(fn)
1654 except UnicodeEncodeError:
1655 return
1656 bytesfn.append(fn)
1657 add_filename(support.TESTFN_UNICODE)
1658 if support.TESTFN_UNENCODABLE:
1659 add_filename(support.TESTFN_UNENCODABLE)
1660 if support.TESTFN_NONASCII:
1661 add_filename(support.TESTFN_NONASCII)
1662 if not bytesfn:
1663 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001664
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 self.unicodefn = set()
1666 os.mkdir(self.dir)
1667 try:
1668 for fn in bytesfn:
1669 support.create_empty_file(os.path.join(self.bdir, fn))
1670 fn = os.fsdecode(fn)
1671 if fn in self.unicodefn:
1672 raise ValueError("duplicate filename")
1673 self.unicodefn.add(fn)
1674 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001675 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001676 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001677
Serhiy Storchaka43767632013-11-03 21:31:38 +02001678 def tearDown(self):
1679 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001680
Serhiy Storchaka43767632013-11-03 21:31:38 +02001681 def test_listdir(self):
1682 expected = self.unicodefn
1683 found = set(os.listdir(self.dir))
1684 self.assertEqual(found, expected)
1685 # test listdir without arguments
1686 current_directory = os.getcwd()
1687 try:
1688 os.chdir(os.sep)
1689 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1690 finally:
1691 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001692
Serhiy Storchaka43767632013-11-03 21:31:38 +02001693 def test_open(self):
1694 for fn in self.unicodefn:
1695 f = open(os.path.join(self.dir, fn), 'rb')
1696 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001697
Serhiy Storchaka43767632013-11-03 21:31:38 +02001698 @unittest.skipUnless(hasattr(os, 'statvfs'),
1699 "need os.statvfs()")
1700 def test_statvfs(self):
1701 # issue #9645
1702 for fn in self.unicodefn:
1703 # should not fail with file not found error
1704 fullname = os.path.join(self.dir, fn)
1705 os.statvfs(fullname)
1706
1707 def test_stat(self):
1708 for fn in self.unicodefn:
1709 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001710
Brian Curtineb24d742010-04-12 17:16:38 +00001711@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1712class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001713 def _kill(self, sig):
1714 # Start sys.executable as a subprocess and communicate from the
1715 # subprocess to the parent that the interpreter is ready. When it
1716 # becomes ready, send *sig* via os.kill to the subprocess and check
1717 # that the return code is equal to *sig*.
1718 import ctypes
1719 from ctypes import wintypes
1720 import msvcrt
1721
1722 # Since we can't access the contents of the process' stdout until the
1723 # process has exited, use PeekNamedPipe to see what's inside stdout
1724 # without waiting. This is done so we can tell that the interpreter
1725 # is started and running at a point where it could handle a signal.
1726 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1727 PeekNamedPipe.restype = wintypes.BOOL
1728 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1729 ctypes.POINTER(ctypes.c_char), # stdout buf
1730 wintypes.DWORD, # Buffer size
1731 ctypes.POINTER(wintypes.DWORD), # bytes read
1732 ctypes.POINTER(wintypes.DWORD), # bytes avail
1733 ctypes.POINTER(wintypes.DWORD)) # bytes left
1734 msg = "running"
1735 proc = subprocess.Popen([sys.executable, "-c",
1736 "import sys;"
1737 "sys.stdout.write('{}');"
1738 "sys.stdout.flush();"
1739 "input()".format(msg)],
1740 stdout=subprocess.PIPE,
1741 stderr=subprocess.PIPE,
1742 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001743 self.addCleanup(proc.stdout.close)
1744 self.addCleanup(proc.stderr.close)
1745 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001746
1747 count, max = 0, 100
1748 while count < max and proc.poll() is None:
1749 # Create a string buffer to store the result of stdout from the pipe
1750 buf = ctypes.create_string_buffer(len(msg))
1751 # Obtain the text currently in proc.stdout
1752 # Bytes read/avail/left are left as NULL and unused
1753 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1754 buf, ctypes.sizeof(buf), None, None, None)
1755 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1756 if buf.value:
1757 self.assertEqual(msg, buf.value.decode())
1758 break
1759 time.sleep(0.1)
1760 count += 1
1761 else:
1762 self.fail("Did not receive communication from the subprocess")
1763
Brian Curtineb24d742010-04-12 17:16:38 +00001764 os.kill(proc.pid, sig)
1765 self.assertEqual(proc.wait(), sig)
1766
1767 def test_kill_sigterm(self):
1768 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001769 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001770
1771 def test_kill_int(self):
1772 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001773 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001774
1775 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001776 tagname = "test_os_%s" % uuid.uuid1()
1777 m = mmap.mmap(-1, 1, tagname)
1778 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001779 # Run a script which has console control handling enabled.
1780 proc = subprocess.Popen([sys.executable,
1781 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001782 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001783 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1784 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001785 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001786 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001787 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001788 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001789 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001790 count += 1
1791 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001792 # Forcefully kill the process if we weren't able to signal it.
1793 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001794 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001795 os.kill(proc.pid, event)
1796 # proc.send_signal(event) could also be done here.
1797 # Allow time for the signal to be passed and the process to exit.
1798 time.sleep(0.5)
1799 if not proc.poll():
1800 # Forcefully kill the process if we weren't able to signal it.
1801 os.kill(proc.pid, signal.SIGINT)
1802 self.fail("subprocess did not stop on {}".format(name))
1803
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001804 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001805 def test_CTRL_C_EVENT(self):
1806 from ctypes import wintypes
1807 import ctypes
1808
1809 # Make a NULL value by creating a pointer with no argument.
1810 NULL = ctypes.POINTER(ctypes.c_int)()
1811 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1812 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1813 wintypes.BOOL)
1814 SetConsoleCtrlHandler.restype = wintypes.BOOL
1815
1816 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001817 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001818 # by subprocesses.
1819 SetConsoleCtrlHandler(NULL, 0)
1820
1821 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1822
1823 def test_CTRL_BREAK_EVENT(self):
1824 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1825
1826
Brian Curtind40e6f72010-07-08 21:39:08 +00001827@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001828class Win32ListdirTests(unittest.TestCase):
1829 """Test listdir on Windows."""
1830
1831 def setUp(self):
1832 self.created_paths = []
1833 for i in range(2):
1834 dir_name = 'SUB%d' % i
1835 dir_path = os.path.join(support.TESTFN, dir_name)
1836 file_name = 'FILE%d' % i
1837 file_path = os.path.join(support.TESTFN, file_name)
1838 os.makedirs(dir_path)
1839 with open(file_path, 'w') as f:
1840 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1841 self.created_paths.extend([dir_name, file_name])
1842 self.created_paths.sort()
1843
1844 def tearDown(self):
1845 shutil.rmtree(support.TESTFN)
1846
1847 def test_listdir_no_extended_path(self):
1848 """Test when the path is not an "extended" path."""
1849 # unicode
1850 self.assertEqual(
1851 sorted(os.listdir(support.TESTFN)),
1852 self.created_paths)
1853 # bytes
1854 self.assertEqual(
1855 sorted(os.listdir(os.fsencode(support.TESTFN))),
1856 [os.fsencode(path) for path in self.created_paths])
1857
1858 def test_listdir_extended_path(self):
1859 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001860 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001861 # unicode
1862 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1863 self.assertEqual(
1864 sorted(os.listdir(path)),
1865 self.created_paths)
1866 # bytes
1867 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1868 self.assertEqual(
1869 sorted(os.listdir(path)),
1870 [os.fsencode(path) for path in self.created_paths])
1871
1872
1873@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001874@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001875class Win32SymlinkTests(unittest.TestCase):
1876 filelink = 'filelinktest'
1877 filelink_target = os.path.abspath(__file__)
1878 dirlink = 'dirlinktest'
1879 dirlink_target = os.path.dirname(filelink_target)
1880 missing_link = 'missing link'
1881
1882 def setUp(self):
1883 assert os.path.exists(self.dirlink_target)
1884 assert os.path.exists(self.filelink_target)
1885 assert not os.path.exists(self.dirlink)
1886 assert not os.path.exists(self.filelink)
1887 assert not os.path.exists(self.missing_link)
1888
1889 def tearDown(self):
1890 if os.path.exists(self.filelink):
1891 os.remove(self.filelink)
1892 if os.path.exists(self.dirlink):
1893 os.rmdir(self.dirlink)
1894 if os.path.lexists(self.missing_link):
1895 os.remove(self.missing_link)
1896
1897 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001898 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001899 self.assertTrue(os.path.exists(self.dirlink))
1900 self.assertTrue(os.path.isdir(self.dirlink))
1901 self.assertTrue(os.path.islink(self.dirlink))
1902 self.check_stat(self.dirlink, self.dirlink_target)
1903
1904 def test_file_link(self):
1905 os.symlink(self.filelink_target, self.filelink)
1906 self.assertTrue(os.path.exists(self.filelink))
1907 self.assertTrue(os.path.isfile(self.filelink))
1908 self.assertTrue(os.path.islink(self.filelink))
1909 self.check_stat(self.filelink, self.filelink_target)
1910
1911 def _create_missing_dir_link(self):
1912 'Create a "directory" link to a non-existent target'
1913 linkname = self.missing_link
1914 if os.path.lexists(linkname):
1915 os.remove(linkname)
1916 target = r'c:\\target does not exist.29r3c740'
1917 assert not os.path.exists(target)
1918 target_is_dir = True
1919 os.symlink(target, linkname, target_is_dir)
1920
1921 def test_remove_directory_link_to_missing_target(self):
1922 self._create_missing_dir_link()
1923 # For compatibility with Unix, os.remove will check the
1924 # directory status and call RemoveDirectory if the symlink
1925 # was created with target_is_dir==True.
1926 os.remove(self.missing_link)
1927
1928 @unittest.skip("currently fails; consider for improvement")
1929 def test_isdir_on_directory_link_to_missing_target(self):
1930 self._create_missing_dir_link()
1931 # consider having isdir return true for directory links
1932 self.assertTrue(os.path.isdir(self.missing_link))
1933
1934 @unittest.skip("currently fails; consider for improvement")
1935 def test_rmdir_on_directory_link_to_missing_target(self):
1936 self._create_missing_dir_link()
1937 # consider allowing rmdir to remove directory links
1938 os.rmdir(self.missing_link)
1939
1940 def check_stat(self, link, target):
1941 self.assertEqual(os.stat(link), os.stat(target))
1942 self.assertNotEqual(os.lstat(link), os.stat(link))
1943
Brian Curtind25aef52011-06-13 15:16:04 -05001944 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001945 with warnings.catch_warnings():
1946 warnings.simplefilter("ignore", DeprecationWarning)
1947 self.assertEqual(os.stat(bytes_link), os.stat(target))
1948 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001949
1950 def test_12084(self):
1951 level1 = os.path.abspath(support.TESTFN)
1952 level2 = os.path.join(level1, "level2")
1953 level3 = os.path.join(level2, "level3")
1954 try:
1955 os.mkdir(level1)
1956 os.mkdir(level2)
1957 os.mkdir(level3)
1958
1959 file1 = os.path.abspath(os.path.join(level1, "file1"))
1960
1961 with open(file1, "w") as f:
1962 f.write("file1")
1963
1964 orig_dir = os.getcwd()
1965 try:
1966 os.chdir(level2)
1967 link = os.path.join(level2, "link")
1968 os.symlink(os.path.relpath(file1), "link")
1969 self.assertIn("link", os.listdir(os.getcwd()))
1970
1971 # Check os.stat calls from the same dir as the link
1972 self.assertEqual(os.stat(file1), os.stat("link"))
1973
1974 # Check os.stat calls from a dir below the link
1975 os.chdir(level1)
1976 self.assertEqual(os.stat(file1),
1977 os.stat(os.path.relpath(link)))
1978
1979 # Check os.stat calls from a dir above the link
1980 os.chdir(level3)
1981 self.assertEqual(os.stat(file1),
1982 os.stat(os.path.relpath(link)))
1983 finally:
1984 os.chdir(orig_dir)
1985 except OSError as err:
1986 self.fail(err)
1987 finally:
1988 os.remove(file1)
1989 shutil.rmtree(level1)
1990
Brian Curtind40e6f72010-07-08 21:39:08 +00001991
Tim Golden0321cf22014-05-05 19:46:17 +01001992@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1993class Win32JunctionTests(unittest.TestCase):
1994 junction = 'junctiontest'
1995 junction_target = os.path.dirname(os.path.abspath(__file__))
1996
1997 def setUp(self):
1998 assert os.path.exists(self.junction_target)
1999 assert not os.path.exists(self.junction)
2000
2001 def tearDown(self):
2002 if os.path.exists(self.junction):
2003 # os.rmdir delegates to Windows' RemoveDirectoryW,
2004 # which removes junction points safely.
2005 os.rmdir(self.junction)
2006
2007 def test_create_junction(self):
2008 _winapi.CreateJunction(self.junction_target, self.junction)
2009 self.assertTrue(os.path.exists(self.junction))
2010 self.assertTrue(os.path.isdir(self.junction))
2011
2012 # Junctions are not recognized as links.
2013 self.assertFalse(os.path.islink(self.junction))
2014
2015 def test_unlink_removes_junction(self):
2016 _winapi.CreateJunction(self.junction_target, self.junction)
2017 self.assertTrue(os.path.exists(self.junction))
2018
2019 os.unlink(self.junction)
2020 self.assertFalse(os.path.exists(self.junction))
2021
2022
Jason R. Coombs3a092862013-05-27 23:21:28 -04002023@support.skip_unless_symlink
2024class NonLocalSymlinkTests(unittest.TestCase):
2025
2026 def setUp(self):
2027 """
2028 Create this structure:
2029
2030 base
2031 \___ some_dir
2032 """
2033 os.makedirs('base/some_dir')
2034
2035 def tearDown(self):
2036 shutil.rmtree('base')
2037
2038 def test_directory_link_nonlocal(self):
2039 """
2040 The symlink target should resolve relative to the link, not relative
2041 to the current directory.
2042
2043 Then, link base/some_link -> base/some_dir and ensure that some_link
2044 is resolved as a directory.
2045
2046 In issue13772, it was discovered that directory detection failed if
2047 the symlink target was not specified relative to the current
2048 directory, which was a defect in the implementation.
2049 """
2050 src = os.path.join('base', 'some_link')
2051 os.symlink('some_dir', src)
2052 assert os.path.isdir(src)
2053
2054
Victor Stinnere8d51452010-08-19 01:05:19 +00002055class FSEncodingTests(unittest.TestCase):
2056 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002057 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2058 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002059
Victor Stinnere8d51452010-08-19 01:05:19 +00002060 def test_identity(self):
2061 # assert fsdecode(fsencode(x)) == x
2062 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2063 try:
2064 bytesfn = os.fsencode(fn)
2065 except UnicodeEncodeError:
2066 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002067 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002068
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002069
Brett Cannonefb00c02012-02-29 18:31:31 -05002070
2071class DeviceEncodingTests(unittest.TestCase):
2072
2073 def test_bad_fd(self):
2074 # Return None when an fd doesn't actually exist.
2075 self.assertIsNone(os.device_encoding(123456))
2076
Philip Jenveye308b7c2012-02-29 16:16:15 -08002077 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2078 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002079 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002080 def test_device_encoding(self):
2081 encoding = os.device_encoding(0)
2082 self.assertIsNotNone(encoding)
2083 self.assertTrue(codecs.lookup(encoding))
2084
2085
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002086class PidTests(unittest.TestCase):
2087 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2088 def test_getppid(self):
2089 p = subprocess.Popen([sys.executable, '-c',
2090 'import os; print(os.getppid())'],
2091 stdout=subprocess.PIPE)
2092 stdout, _ = p.communicate()
2093 # We are the parent of our subprocess
2094 self.assertEqual(int(stdout), os.getpid())
2095
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002096 def test_waitpid(self):
2097 args = [sys.executable, '-c', 'pass']
2098 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2099 status = os.waitpid(pid, 0)
2100 self.assertEqual(status, (pid, 0))
2101
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002102
Brian Curtin0151b8e2010-09-24 13:43:43 +00002103# The introduction of this TestCase caused at least two different errors on
2104# *nix buildbots. Temporarily skip this to let the buildbots move along.
2105@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002106@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2107class LoginTests(unittest.TestCase):
2108 def test_getlogin(self):
2109 user_name = os.getlogin()
2110 self.assertNotEqual(len(user_name), 0)
2111
2112
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002113@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2114 "needs os.getpriority and os.setpriority")
2115class ProgramPriorityTests(unittest.TestCase):
2116 """Tests for os.getpriority() and os.setpriority()."""
2117
2118 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002119
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002120 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2121 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2122 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002123 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2124 if base >= 19 and new_prio <= 19:
2125 raise unittest.SkipTest(
2126 "unable to reliably test setpriority at current nice level of %s" % base)
2127 else:
2128 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002129 finally:
2130 try:
2131 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2132 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002133 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002134 raise
2135
2136
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002137if threading is not None:
2138 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002139
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002140 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002141
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002142 def __init__(self, conn):
2143 asynchat.async_chat.__init__(self, conn)
2144 self.in_buffer = []
2145 self.closed = False
2146 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002147
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002148 def handle_read(self):
2149 data = self.recv(4096)
2150 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002151
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002152 def get_data(self):
2153 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002154
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002155 def handle_close(self):
2156 self.close()
2157 self.closed = True
2158
2159 def handle_error(self):
2160 raise
2161
2162 def __init__(self, address):
2163 threading.Thread.__init__(self)
2164 asyncore.dispatcher.__init__(self)
2165 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2166 self.bind(address)
2167 self.listen(5)
2168 self.host, self.port = self.socket.getsockname()[:2]
2169 self.handler_instance = None
2170 self._active = False
2171 self._active_lock = threading.Lock()
2172
2173 # --- public API
2174
2175 @property
2176 def running(self):
2177 return self._active
2178
2179 def start(self):
2180 assert not self.running
2181 self.__flag = threading.Event()
2182 threading.Thread.start(self)
2183 self.__flag.wait()
2184
2185 def stop(self):
2186 assert self.running
2187 self._active = False
2188 self.join()
2189
2190 def wait(self):
2191 # wait for handler connection to be closed, then stop the server
2192 while not getattr(self.handler_instance, "closed", False):
2193 time.sleep(0.001)
2194 self.stop()
2195
2196 # --- internals
2197
2198 def run(self):
2199 self._active = True
2200 self.__flag.set()
2201 while self._active and asyncore.socket_map:
2202 self._active_lock.acquire()
2203 asyncore.loop(timeout=0.001, count=1)
2204 self._active_lock.release()
2205 asyncore.close_all()
2206
2207 def handle_accept(self):
2208 conn, addr = self.accept()
2209 self.handler_instance = self.Handler(conn)
2210
2211 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002212 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002213 handle_read = handle_connect
2214
2215 def writable(self):
2216 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002217
2218 def handle_error(self):
2219 raise
2220
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002221
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002222@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002223@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2224class TestSendfile(unittest.TestCase):
2225
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002226 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002227 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002228 not sys.platform.startswith("solaris") and \
2229 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002230 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2231 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002232
2233 @classmethod
2234 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002235 cls.key = support.threading_setup()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002236 with open(support.TESTFN, "wb") as f:
2237 f.write(cls.DATA)
2238
2239 @classmethod
2240 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002241 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002242 support.unlink(support.TESTFN)
2243
2244 def setUp(self):
2245 self.server = SendfileTestServer((support.HOST, 0))
2246 self.server.start()
2247 self.client = socket.socket()
2248 self.client.connect((self.server.host, self.server.port))
2249 self.client.settimeout(1)
2250 # synchronize by waiting for "220 ready" response
2251 self.client.recv(1024)
2252 self.sockno = self.client.fileno()
2253 self.file = open(support.TESTFN, 'rb')
2254 self.fileno = self.file.fileno()
2255
2256 def tearDown(self):
2257 self.file.close()
2258 self.client.close()
2259 if self.server.running:
2260 self.server.stop()
2261
2262 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2263 """A higher level wrapper representing how an application is
2264 supposed to use sendfile().
2265 """
2266 while 1:
2267 try:
2268 if self.SUPPORT_HEADERS_TRAILERS:
2269 return os.sendfile(sock, file, offset, nbytes, headers,
2270 trailers)
2271 else:
2272 return os.sendfile(sock, file, offset, nbytes)
2273 except OSError as err:
2274 if err.errno == errno.ECONNRESET:
2275 # disconnected
2276 raise
2277 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2278 # we have to retry send data
2279 continue
2280 else:
2281 raise
2282
2283 def test_send_whole_file(self):
2284 # normal send
2285 total_sent = 0
2286 offset = 0
2287 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002288 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002289 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2290 if sent == 0:
2291 break
2292 offset += sent
2293 total_sent += sent
2294 self.assertTrue(sent <= nbytes)
2295 self.assertEqual(offset, total_sent)
2296
2297 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002298 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002299 self.client.close()
2300 self.server.wait()
2301 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002302 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002303 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002304
2305 def test_send_at_certain_offset(self):
2306 # start sending a file at a certain offset
2307 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002308 offset = len(self.DATA) // 2
2309 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002310 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002311 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002312 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2313 if sent == 0:
2314 break
2315 offset += sent
2316 total_sent += sent
2317 self.assertTrue(sent <= nbytes)
2318
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002319 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002320 self.client.close()
2321 self.server.wait()
2322 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002323 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002324 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002325 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002326 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002327
2328 def test_offset_overflow(self):
2329 # specify an offset > file size
2330 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002331 try:
2332 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2333 except OSError as e:
2334 # Solaris can raise EINVAL if offset >= file length, ignore.
2335 if e.errno != errno.EINVAL:
2336 raise
2337 else:
2338 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002339 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002340 self.client.close()
2341 self.server.wait()
2342 data = self.server.handler_instance.get_data()
2343 self.assertEqual(data, b'')
2344
2345 def test_invalid_offset(self):
2346 with self.assertRaises(OSError) as cm:
2347 os.sendfile(self.sockno, self.fileno, -1, 4096)
2348 self.assertEqual(cm.exception.errno, errno.EINVAL)
2349
Martin Panterbf19d162015-09-09 01:01:13 +00002350 def test_keywords(self):
2351 # Keyword arguments should be supported
2352 os.sendfile(out=self.sockno, offset=0, count=4096,
2353 **{'in': self.fileno})
2354 if self.SUPPORT_HEADERS_TRAILERS:
2355 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002356 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002357
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002358 # --- headers / trailers tests
2359
Serhiy Storchaka43767632013-11-03 21:31:38 +02002360 @requires_headers_trailers
2361 def test_headers(self):
2362 total_sent = 0
2363 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2364 headers=[b"x" * 512])
2365 total_sent += sent
2366 offset = 4096
2367 nbytes = 4096
2368 while 1:
2369 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2370 offset, nbytes)
2371 if sent == 0:
2372 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002373 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002374 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002375
Serhiy Storchaka43767632013-11-03 21:31:38 +02002376 expected_data = b"x" * 512 + self.DATA
2377 self.assertEqual(total_sent, len(expected_data))
2378 self.client.close()
2379 self.server.wait()
2380 data = self.server.handler_instance.get_data()
2381 self.assertEqual(hash(data), hash(expected_data))
2382
2383 @requires_headers_trailers
2384 def test_trailers(self):
2385 TESTFN2 = support.TESTFN + "2"
2386 file_data = b"abcdef"
2387 with open(TESTFN2, 'wb') as f:
2388 f.write(file_data)
2389 with open(TESTFN2, 'rb')as f:
2390 self.addCleanup(os.remove, TESTFN2)
2391 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2392 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002393 self.client.close()
2394 self.server.wait()
2395 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002396 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002397
Serhiy Storchaka43767632013-11-03 21:31:38 +02002398 @requires_headers_trailers
2399 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2400 'test needs os.SF_NODISKIO')
2401 def test_flags(self):
2402 try:
2403 os.sendfile(self.sockno, self.fileno, 0, 4096,
2404 flags=os.SF_NODISKIO)
2405 except OSError as err:
2406 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2407 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002408
2409
Larry Hastings9cf065c2012-06-22 16:30:09 -07002410def supports_extended_attributes():
2411 if not hasattr(os, "setxattr"):
2412 return False
2413 try:
2414 with open(support.TESTFN, "wb") as fp:
2415 try:
2416 os.setxattr(fp.fileno(), b"user.test", b"")
2417 except OSError:
2418 return False
2419 finally:
2420 support.unlink(support.TESTFN)
2421 # Kernels < 2.6.39 don't respect setxattr flags.
2422 kernel_version = platform.release()
2423 m = re.match("2.6.(\d{1,2})", kernel_version)
2424 return m is None or int(m.group(1)) >= 39
2425
2426
2427@unittest.skipUnless(supports_extended_attributes(),
2428 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002429class ExtendedAttributeTests(unittest.TestCase):
2430
2431 def tearDown(self):
2432 support.unlink(support.TESTFN)
2433
Larry Hastings9cf065c2012-06-22 16:30:09 -07002434 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002435 fn = support.TESTFN
2436 open(fn, "wb").close()
2437 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002438 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002439 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002440 init_xattr = listxattr(fn)
2441 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002442 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002443 xattr = set(init_xattr)
2444 xattr.add("user.test")
2445 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002446 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2447 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2448 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002449 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002450 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002451 self.assertEqual(cm.exception.errno, errno.EEXIST)
2452 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002453 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002454 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002455 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002456 xattr.add("user.test2")
2457 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002458 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002459 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002460 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002461 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002462 xattr.remove("user.test")
2463 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002464 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2465 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2466 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2467 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002468 many = sorted("user.test{}".format(i) for i in range(100))
2469 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002470 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002471 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002472
Larry Hastings9cf065c2012-06-22 16:30:09 -07002473 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002474 def make_bytes(s):
2475 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002476 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002477 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002478 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002479
2480 def test_simple(self):
2481 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2482 os.listxattr)
2483
2484 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002485 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2486 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002487
2488 def test_fds(self):
2489 def getxattr(path, *args):
2490 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002491 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002492 def setxattr(path, *args):
2493 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002494 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002495 def removexattr(path, *args):
2496 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002497 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002498 def listxattr(path, *args):
2499 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002500 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002501 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2502
2503
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002504@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2505class Win32DeprecatedBytesAPI(unittest.TestCase):
2506 def test_deprecated(self):
2507 import nt
2508 filename = os.fsencode(support.TESTFN)
2509 with warnings.catch_warnings():
2510 warnings.simplefilter("error", DeprecationWarning)
2511 for func, *args in (
2512 (nt._getfullpathname, filename),
2513 (nt._isdir, filename),
2514 (os.access, filename, os.R_OK),
2515 (os.chdir, filename),
2516 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002517 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002518 (os.link, filename, filename),
2519 (os.listdir, filename),
2520 (os.lstat, filename),
2521 (os.mkdir, filename),
2522 (os.open, filename, os.O_RDONLY),
2523 (os.rename, filename, filename),
2524 (os.rmdir, filename),
2525 (os.startfile, filename),
2526 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002527 (os.unlink, filename),
2528 (os.utime, filename),
2529 ):
2530 self.assertRaises(DeprecationWarning, func, *args)
2531
Victor Stinner28216442011-11-16 00:34:44 +01002532 @support.skip_unless_symlink
2533 def test_symlink(self):
2534 filename = os.fsencode(support.TESTFN)
2535 with warnings.catch_warnings():
2536 warnings.simplefilter("error", DeprecationWarning)
2537 self.assertRaises(DeprecationWarning,
2538 os.symlink, filename, filename)
2539
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002540
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002541@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2542class TermsizeTests(unittest.TestCase):
2543 def test_does_not_crash(self):
2544 """Check if get_terminal_size() returns a meaningful value.
2545
2546 There's no easy portable way to actually check the size of the
2547 terminal, so let's check if it returns something sensible instead.
2548 """
2549 try:
2550 size = os.get_terminal_size()
2551 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002552 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002553 # Under win32 a generic OSError can be thrown if the
2554 # handle cannot be retrieved
2555 self.skipTest("failed to query terminal size")
2556 raise
2557
Antoine Pitroucfade362012-02-08 23:48:59 +01002558 self.assertGreaterEqual(size.columns, 0)
2559 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002560
2561 def test_stty_match(self):
2562 """Check if stty returns the same results
2563
2564 stty actually tests stdin, so get_terminal_size is invoked on
2565 stdin explicitly. If stty succeeded, then get_terminal_size()
2566 should work too.
2567 """
2568 try:
2569 size = subprocess.check_output(['stty', 'size']).decode().split()
2570 except (FileNotFoundError, subprocess.CalledProcessError):
2571 self.skipTest("stty invocation failed")
2572 expected = (int(size[1]), int(size[0])) # reversed order
2573
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002574 try:
2575 actual = os.get_terminal_size(sys.__stdin__.fileno())
2576 except OSError as e:
2577 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2578 # Under win32 a generic OSError can be thrown if the
2579 # handle cannot be retrieved
2580 self.skipTest("failed to query terminal size")
2581 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002582 self.assertEqual(expected, actual)
2583
2584
Victor Stinner292c8352012-10-30 02:17:38 +01002585class OSErrorTests(unittest.TestCase):
2586 def setUp(self):
2587 class Str(str):
2588 pass
2589
Victor Stinnerafe17062012-10-31 22:47:43 +01002590 self.bytes_filenames = []
2591 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002592 if support.TESTFN_UNENCODABLE is not None:
2593 decoded = support.TESTFN_UNENCODABLE
2594 else:
2595 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002596 self.unicode_filenames.append(decoded)
2597 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002598 if support.TESTFN_UNDECODABLE is not None:
2599 encoded = support.TESTFN_UNDECODABLE
2600 else:
2601 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002602 self.bytes_filenames.append(encoded)
2603 self.bytes_filenames.append(memoryview(encoded))
2604
2605 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002606
2607 def test_oserror_filename(self):
2608 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002609 (self.filenames, os.chdir,),
2610 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002611 (self.filenames, os.lstat,),
2612 (self.filenames, os.open, os.O_RDONLY),
2613 (self.filenames, os.rmdir,),
2614 (self.filenames, os.stat,),
2615 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002616 ]
2617 if sys.platform == "win32":
2618 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002619 (self.bytes_filenames, os.rename, b"dst"),
2620 (self.bytes_filenames, os.replace, b"dst"),
2621 (self.unicode_filenames, os.rename, "dst"),
2622 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002623 # Issue #16414: Don't test undecodable names with listdir()
2624 # because of a Windows bug.
2625 #
2626 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2627 # empty list (instead of failing), whereas os.listdir(b'\xff')
2628 # raises a FileNotFoundError. It looks like a Windows bug:
2629 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2630 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2631 # ERROR_PATH_NOT_FOUND (3).
2632 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002633 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002634 else:
2635 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002636 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002637 (self.filenames, os.rename, "dst"),
2638 (self.filenames, os.replace, "dst"),
2639 ))
2640 if hasattr(os, "chown"):
2641 funcs.append((self.filenames, os.chown, 0, 0))
2642 if hasattr(os, "lchown"):
2643 funcs.append((self.filenames, os.lchown, 0, 0))
2644 if hasattr(os, "truncate"):
2645 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002646 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002647 funcs.append((self.filenames, os.chflags, 0))
2648 if hasattr(os, "lchflags"):
2649 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002650 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002651 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002652 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002653 if sys.platform == "win32":
2654 funcs.append((self.bytes_filenames, os.link, b"dst"))
2655 funcs.append((self.unicode_filenames, os.link, "dst"))
2656 else:
2657 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002658 if hasattr(os, "listxattr"):
2659 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002660 (self.filenames, os.listxattr,),
2661 (self.filenames, os.getxattr, "user.test"),
2662 (self.filenames, os.setxattr, "user.test", b'user'),
2663 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002664 ))
2665 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002666 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002667 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002668 if sys.platform == "win32":
2669 funcs.append((self.unicode_filenames, os.readlink,))
2670 else:
2671 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002672
Victor Stinnerafe17062012-10-31 22:47:43 +01002673 for filenames, func, *func_args in funcs:
2674 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002675 try:
2676 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002677 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002678 self.assertIs(err.filename, name)
2679 else:
2680 self.fail("No exception thrown by {}".format(func))
2681
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002682class CPUCountTests(unittest.TestCase):
2683 def test_cpu_count(self):
2684 cpus = os.cpu_count()
2685 if cpus is not None:
2686 self.assertIsInstance(cpus, int)
2687 self.assertGreater(cpus, 0)
2688 else:
2689 self.skipTest("Could not determine the number of CPUs")
2690
Victor Stinnerdaf45552013-08-28 00:53:59 +02002691
2692class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002693 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002694 fd = os.open(__file__, os.O_RDONLY)
2695 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002696 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002697
Victor Stinnerdaf45552013-08-28 00:53:59 +02002698 os.set_inheritable(fd, True)
2699 self.assertEqual(os.get_inheritable(fd), True)
2700
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002701 @unittest.skipIf(fcntl is None, "need fcntl")
2702 def test_get_inheritable_cloexec(self):
2703 fd = os.open(__file__, os.O_RDONLY)
2704 self.addCleanup(os.close, fd)
2705 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002706
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002707 # clear FD_CLOEXEC flag
2708 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2709 flags &= ~fcntl.FD_CLOEXEC
2710 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002711
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002712 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002713
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002714 @unittest.skipIf(fcntl is None, "need fcntl")
2715 def test_set_inheritable_cloexec(self):
2716 fd = os.open(__file__, os.O_RDONLY)
2717 self.addCleanup(os.close, fd)
2718 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2719 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002720
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002721 os.set_inheritable(fd, True)
2722 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2723 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002724
Victor Stinnerdaf45552013-08-28 00:53:59 +02002725 def test_open(self):
2726 fd = os.open(__file__, os.O_RDONLY)
2727 self.addCleanup(os.close, fd)
2728 self.assertEqual(os.get_inheritable(fd), False)
2729
2730 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2731 def test_pipe(self):
2732 rfd, wfd = os.pipe()
2733 self.addCleanup(os.close, rfd)
2734 self.addCleanup(os.close, wfd)
2735 self.assertEqual(os.get_inheritable(rfd), False)
2736 self.assertEqual(os.get_inheritable(wfd), False)
2737
2738 def test_dup(self):
2739 fd1 = os.open(__file__, os.O_RDONLY)
2740 self.addCleanup(os.close, fd1)
2741
2742 fd2 = os.dup(fd1)
2743 self.addCleanup(os.close, fd2)
2744 self.assertEqual(os.get_inheritable(fd2), False)
2745
2746 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2747 def test_dup2(self):
2748 fd = os.open(__file__, os.O_RDONLY)
2749 self.addCleanup(os.close, fd)
2750
2751 # inheritable by default
2752 fd2 = os.open(__file__, os.O_RDONLY)
2753 try:
2754 os.dup2(fd, fd2)
2755 self.assertEqual(os.get_inheritable(fd2), True)
2756 finally:
2757 os.close(fd2)
2758
2759 # force non-inheritable
2760 fd3 = os.open(__file__, os.O_RDONLY)
2761 try:
2762 os.dup2(fd, fd3, inheritable=False)
2763 self.assertEqual(os.get_inheritable(fd3), False)
2764 finally:
2765 os.close(fd3)
2766
2767 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2768 def test_openpty(self):
2769 master_fd, slave_fd = os.openpty()
2770 self.addCleanup(os.close, master_fd)
2771 self.addCleanup(os.close, slave_fd)
2772 self.assertEqual(os.get_inheritable(master_fd), False)
2773 self.assertEqual(os.get_inheritable(slave_fd), False)
2774
2775
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002776@unittest.skipUnless(hasattr(os, 'get_blocking'),
2777 'needs os.get_blocking() and os.set_blocking()')
2778class BlockingTests(unittest.TestCase):
2779 def test_blocking(self):
2780 fd = os.open(__file__, os.O_RDONLY)
2781 self.addCleanup(os.close, fd)
2782 self.assertEqual(os.get_blocking(fd), True)
2783
2784 os.set_blocking(fd, False)
2785 self.assertEqual(os.get_blocking(fd), False)
2786
2787 os.set_blocking(fd, True)
2788 self.assertEqual(os.get_blocking(fd), True)
2789
2790
Yury Selivanov97e2e062014-09-26 12:33:06 -04002791
2792class ExportsTests(unittest.TestCase):
2793 def test_os_all(self):
2794 self.assertIn('open', os.__all__)
2795 self.assertIn('walk', os.__all__)
2796
2797
Victor Stinner6036e442015-03-08 01:58:04 +01002798class TestScandir(unittest.TestCase):
2799 def setUp(self):
2800 self.path = os.path.realpath(support.TESTFN)
2801 self.addCleanup(support.rmtree, self.path)
2802 os.mkdir(self.path)
2803
2804 def create_file(self, name="file.txt"):
2805 filename = os.path.join(self.path, name)
2806 with open(filename, "wb") as fp:
2807 fp.write(b'python')
2808 return filename
2809
2810 def get_entries(self, names):
2811 entries = dict((entry.name, entry)
2812 for entry in os.scandir(self.path))
2813 self.assertEqual(sorted(entries.keys()), names)
2814 return entries
2815
2816 def assert_stat_equal(self, stat1, stat2, skip_fields):
2817 if skip_fields:
2818 for attr in dir(stat1):
2819 if not attr.startswith("st_"):
2820 continue
2821 if attr in ("st_dev", "st_ino", "st_nlink"):
2822 continue
2823 self.assertEqual(getattr(stat1, attr),
2824 getattr(stat2, attr),
2825 (stat1, stat2, attr))
2826 else:
2827 self.assertEqual(stat1, stat2)
2828
2829 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2830 self.assertEqual(entry.name, name)
2831 self.assertEqual(entry.path, os.path.join(self.path, name))
2832 self.assertEqual(entry.inode(),
2833 os.stat(entry.path, follow_symlinks=False).st_ino)
2834
2835 entry_stat = os.stat(entry.path)
2836 self.assertEqual(entry.is_dir(),
2837 stat.S_ISDIR(entry_stat.st_mode))
2838 self.assertEqual(entry.is_file(),
2839 stat.S_ISREG(entry_stat.st_mode))
2840 self.assertEqual(entry.is_symlink(),
2841 os.path.islink(entry.path))
2842
2843 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2844 self.assertEqual(entry.is_dir(follow_symlinks=False),
2845 stat.S_ISDIR(entry_lstat.st_mode))
2846 self.assertEqual(entry.is_file(follow_symlinks=False),
2847 stat.S_ISREG(entry_lstat.st_mode))
2848
2849 self.assert_stat_equal(entry.stat(),
2850 entry_stat,
2851 os.name == 'nt' and not is_symlink)
2852 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2853 entry_lstat,
2854 os.name == 'nt')
2855
2856 def test_attributes(self):
2857 link = hasattr(os, 'link')
2858 symlink = support.can_symlink()
2859
2860 dirname = os.path.join(self.path, "dir")
2861 os.mkdir(dirname)
2862 filename = self.create_file("file.txt")
2863 if link:
2864 os.link(filename, os.path.join(self.path, "link_file.txt"))
2865 if symlink:
2866 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2867 target_is_directory=True)
2868 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2869
2870 names = ['dir', 'file.txt']
2871 if link:
2872 names.append('link_file.txt')
2873 if symlink:
2874 names.extend(('symlink_dir', 'symlink_file.txt'))
2875 entries = self.get_entries(names)
2876
2877 entry = entries['dir']
2878 self.check_entry(entry, 'dir', True, False, False)
2879
2880 entry = entries['file.txt']
2881 self.check_entry(entry, 'file.txt', False, True, False)
2882
2883 if link:
2884 entry = entries['link_file.txt']
2885 self.check_entry(entry, 'link_file.txt', False, True, False)
2886
2887 if symlink:
2888 entry = entries['symlink_dir']
2889 self.check_entry(entry, 'symlink_dir', True, False, True)
2890
2891 entry = entries['symlink_file.txt']
2892 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2893
2894 def get_entry(self, name):
2895 entries = list(os.scandir(self.path))
2896 self.assertEqual(len(entries), 1)
2897
2898 entry = entries[0]
2899 self.assertEqual(entry.name, name)
2900 return entry
2901
2902 def create_file_entry(self):
2903 filename = self.create_file()
2904 return self.get_entry(os.path.basename(filename))
2905
2906 def test_current_directory(self):
2907 filename = self.create_file()
2908 old_dir = os.getcwd()
2909 try:
2910 os.chdir(self.path)
2911
2912 # call scandir() without parameter: it must list the content
2913 # of the current directory
2914 entries = dict((entry.name, entry) for entry in os.scandir())
2915 self.assertEqual(sorted(entries.keys()),
2916 [os.path.basename(filename)])
2917 finally:
2918 os.chdir(old_dir)
2919
2920 def test_repr(self):
2921 entry = self.create_file_entry()
2922 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2923
2924 def test_removed_dir(self):
2925 path = os.path.join(self.path, 'dir')
2926
2927 os.mkdir(path)
2928 entry = self.get_entry('dir')
2929 os.rmdir(path)
2930
2931 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2932 if os.name == 'nt':
2933 self.assertTrue(entry.is_dir())
2934 self.assertFalse(entry.is_file())
2935 self.assertFalse(entry.is_symlink())
2936 if os.name == 'nt':
2937 self.assertRaises(FileNotFoundError, entry.inode)
2938 # don't fail
2939 entry.stat()
2940 entry.stat(follow_symlinks=False)
2941 else:
2942 self.assertGreater(entry.inode(), 0)
2943 self.assertRaises(FileNotFoundError, entry.stat)
2944 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2945
2946 def test_removed_file(self):
2947 entry = self.create_file_entry()
2948 os.unlink(entry.path)
2949
2950 self.assertFalse(entry.is_dir())
2951 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2952 if os.name == 'nt':
2953 self.assertTrue(entry.is_file())
2954 self.assertFalse(entry.is_symlink())
2955 if os.name == 'nt':
2956 self.assertRaises(FileNotFoundError, entry.inode)
2957 # don't fail
2958 entry.stat()
2959 entry.stat(follow_symlinks=False)
2960 else:
2961 self.assertGreater(entry.inode(), 0)
2962 self.assertRaises(FileNotFoundError, entry.stat)
2963 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2964
2965 def test_broken_symlink(self):
2966 if not support.can_symlink():
2967 return self.skipTest('cannot create symbolic link')
2968
2969 filename = self.create_file("file.txt")
2970 os.symlink(filename,
2971 os.path.join(self.path, "symlink.txt"))
2972 entries = self.get_entries(['file.txt', 'symlink.txt'])
2973 entry = entries['symlink.txt']
2974 os.unlink(filename)
2975
2976 self.assertGreater(entry.inode(), 0)
2977 self.assertFalse(entry.is_dir())
2978 self.assertFalse(entry.is_file()) # broken symlink returns False
2979 self.assertFalse(entry.is_dir(follow_symlinks=False))
2980 self.assertFalse(entry.is_file(follow_symlinks=False))
2981 self.assertTrue(entry.is_symlink())
2982 self.assertRaises(FileNotFoundError, entry.stat)
2983 # don't fail
2984 entry.stat(follow_symlinks=False)
2985
2986 def test_bytes(self):
2987 if os.name == "nt":
2988 # On Windows, os.scandir(bytes) must raise an exception
2989 self.assertRaises(TypeError, os.scandir, b'.')
2990 return
2991
2992 self.create_file("file.txt")
2993
2994 path_bytes = os.fsencode(self.path)
2995 entries = list(os.scandir(path_bytes))
2996 self.assertEqual(len(entries), 1, entries)
2997 entry = entries[0]
2998
2999 self.assertEqual(entry.name, b'file.txt')
3000 self.assertEqual(entry.path,
3001 os.fsencode(os.path.join(self.path, 'file.txt')))
3002
3003 def test_empty_path(self):
3004 self.assertRaises(FileNotFoundError, os.scandir, '')
3005
3006 def test_consume_iterator_twice(self):
3007 self.create_file("file.txt")
3008 iterator = os.scandir(self.path)
3009
3010 entries = list(iterator)
3011 self.assertEqual(len(entries), 1, entries)
3012
3013 # check than consuming the iterator twice doesn't raise exception
3014 entries2 = list(iterator)
3015 self.assertEqual(len(entries2), 0, entries2)
3016
3017 def test_bad_path_type(self):
3018 for obj in [1234, 1.234, {}, []]:
3019 self.assertRaises(TypeError, os.scandir, obj)
3020
3021
Fred Drake2e2be372001-09-20 21:33:42 +00003022if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003023 unittest.main()