blob: 15040c3639b8d02e163142f597af2d5a508f7855 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import platform
19import re
Victor Stinner47aacc82015-06-12 17:26:23 +020020import shutil
21import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000022import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000032try:
33 import threading
34except ImportError:
35 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import grp
50 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
51 if hasattr(os, 'getgid'):
52 process_gid = os.getgid()
53 if process_gid not in groups:
54 groups.append(process_gid)
55except ImportError:
56 groups = []
57try:
58 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
60except ImportError:
61 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Berker Peksagce643912015-05-06 06:33:17 +030067from test.support.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Thomas Wouters0e3f5912006-08-11 14:57:12 +000085# Tests creating TESTFN
86class FileTests(unittest.TestCase):
87 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000088 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000089 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000090 tearDown = setUp
91
92 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000093 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000095 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096
Christian Heimesfdab48e2008-01-20 09:06:41 +000097 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000098 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
99 # We must allocate two consecutive file descriptors, otherwise
100 # it will mess up other file descriptors (perhaps even the three
101 # standard ones).
102 second = os.dup(first)
103 try:
104 retries = 0
105 while second != first + 1:
106 os.close(first)
107 retries += 1
108 if retries > 10:
109 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000110 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000111 first, second = second, os.dup(second)
112 finally:
113 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000114 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000115 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000116 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000118 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000119 def test_rename(self):
120 path = support.TESTFN
121 old = sys.getrefcount(path)
122 self.assertRaises(TypeError, os.rename, path, 0)
123 new = sys.getrefcount(path)
124 self.assertEqual(old, new)
125
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000126 def test_read(self):
127 with open(support.TESTFN, "w+b") as fobj:
128 fobj.write(b"spam")
129 fobj.flush()
130 fd = fobj.fileno()
131 os.lseek(fd, 0, 0)
132 s = os.read(fd, 4)
133 self.assertEqual(type(s), bytes)
134 self.assertEqual(s, b"spam")
135
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200136 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200137 # Skip the test on 32-bit platforms: the number of bytes must fit in a
138 # Py_ssize_t type
139 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
140 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200141 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
142 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200143 with open(support.TESTFN, "wb") as fp:
144 fp.write(b'test')
145 self.addCleanup(support.unlink, support.TESTFN)
146
147 # Issue #21932: Make sure that os.read() does not raise an
148 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200149 with open(support.TESTFN, "rb") as fp:
150 data = os.read(fp.fileno(), size)
151
152 # The test does not try to read more than 2 GB at once because the
153 # operating system is free to return less bytes than requested.
154 self.assertEqual(data, b'test')
155
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000156 def test_write(self):
157 # os.write() accepts bytes- and buffer-like objects but not strings
158 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
159 self.assertRaises(TypeError, os.write, fd, "beans")
160 os.write(fd, b"bacon\n")
161 os.write(fd, bytearray(b"eggs\n"))
162 os.write(fd, memoryview(b"spam\n"))
163 os.close(fd)
164 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000165 self.assertEqual(fobj.read().splitlines(),
166 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000167
Victor Stinnere0daff12011-03-20 23:36:35 +0100168 def write_windows_console(self, *args):
169 retcode = subprocess.call(args,
170 # use a new console to not flood the test output
171 creationflags=subprocess.CREATE_NEW_CONSOLE,
172 # use a shell to hide the console window (SW_HIDE)
173 shell=True)
174 self.assertEqual(retcode, 0)
175
176 @unittest.skipUnless(sys.platform == 'win32',
177 'test specific to the Windows console')
178 def test_write_windows_console(self):
179 # Issue #11395: the Windows console returns an error (12: not enough
180 # space error) on writing into stdout if stdout mode is binary and the
181 # length is greater than 66,000 bytes (or less, depending on heap
182 # usage).
183 code = "print('x' * 100000)"
184 self.write_windows_console(sys.executable, "-c", code)
185 self.write_windows_console(sys.executable, "-u", "-c", code)
186
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000187 def fdopen_helper(self, *args):
188 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200189 f = os.fdopen(fd, *args)
190 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000191
192 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200193 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
194 os.close(fd)
195
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000196 self.fdopen_helper()
197 self.fdopen_helper('r')
198 self.fdopen_helper('r', 100)
199
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100200 def test_replace(self):
201 TESTFN2 = support.TESTFN + ".2"
202 with open(support.TESTFN, 'w') as f:
203 f.write("1")
204 with open(TESTFN2, 'w') as f:
205 f.write("2")
206 self.addCleanup(os.unlink, TESTFN2)
207 os.replace(support.TESTFN, TESTFN2)
208 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
209 with open(TESTFN2, 'r') as f:
210 self.assertEqual(f.read(), "1")
211
Martin Panterbf19d162015-09-09 01:01:13 +0000212 def test_open_keywords(self):
213 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
214 dir_fd=None)
215 os.close(f)
216
217 def test_symlink_keywords(self):
218 symlink = support.get_attribute(os, "symlink")
219 try:
220 symlink(src='target', dst=support.TESTFN,
221 target_is_directory=False, dir_fd=None)
222 except (NotImplementedError, OSError):
223 pass # No OS support or unprivileged user
224
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200225
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000226# Test attributes on return values from os.*stat* family.
227class StatAttributeTests(unittest.TestCase):
228 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200229 self.fname = support.TESTFN
230 self.addCleanup(support.unlink, self.fname)
231 with open(self.fname, 'wb') as fp:
232 fp.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233
Serhiy Storchaka43767632013-11-03 21:31:38 +0200234 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000235 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000236 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000237
238 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000239 self.assertEqual(result[stat.ST_SIZE], 3)
240 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 # Make sure all the attributes are there
243 members = dir(result)
244 for name in dir(stat):
245 if name[:3] == 'ST_':
246 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000247 if name.endswith("TIME"):
248 def trunc(x): return int(x)
249 else:
250 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000252 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000253 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254
Larry Hastings6fe20b32012-04-19 15:07:49 -0700255 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700256 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700257 for name in 'st_atime st_mtime st_ctime'.split():
258 floaty = int(getattr(result, name) * 100000)
259 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700260 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700261
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000262 try:
263 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200264 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000265 except IndexError:
266 pass
267
268 # Make sure that assignment fails
269 try:
270 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200271 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000272 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273 pass
274
275 try:
276 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200277 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000278 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000279 pass
280
281 try:
282 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200283 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 except AttributeError:
285 pass
286
287 # Use the stat_result constructor with a too-short tuple.
288 try:
289 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200290 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000291 except TypeError:
292 pass
293
Ezio Melotti42da6632011-03-15 05:18:48 +0200294 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 try:
296 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
297 except TypeError:
298 pass
299
Antoine Pitrou38425292010-09-21 18:19:07 +0000300 def test_stat_attributes(self):
301 self.check_stat_attributes(self.fname)
302
303 def test_stat_attributes_bytes(self):
304 try:
305 fname = self.fname.encode(sys.getfilesystemencoding())
306 except UnicodeEncodeError:
307 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100308 with warnings.catch_warnings():
309 warnings.simplefilter("ignore", DeprecationWarning)
310 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000311
Christian Heimes25827622013-10-12 01:27:08 +0200312 def test_stat_result_pickle(self):
313 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200314 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
315 p = pickle.dumps(result, proto)
316 self.assertIn(b'stat_result', p)
317 if proto < 4:
318 self.assertIn(b'cos\nstat_result\n', p)
319 unpickled = pickle.loads(p)
320 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200321
Serhiy Storchaka43767632013-11-03 21:31:38 +0200322 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000323 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000324 try:
325 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000326 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000327 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000328 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200329 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000330
331 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000334 # Make sure all the attributes are there.
335 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
336 'ffree', 'favail', 'flag', 'namemax')
337 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339
340 # Make sure that assignment really fails
341 try:
342 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200343 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000344 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345 pass
346
347 try:
348 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200349 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 except AttributeError:
351 pass
352
353 # Use the constructor with a too-short tuple.
354 try:
355 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200356 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000357 except TypeError:
358 pass
359
Ezio Melotti42da6632011-03-15 05:18:48 +0200360 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361 try:
362 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
363 except TypeError:
364 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000365
Christian Heimes25827622013-10-12 01:27:08 +0200366 @unittest.skipUnless(hasattr(os, 'statvfs'),
367 "need os.statvfs()")
368 def test_statvfs_result_pickle(self):
369 try:
370 result = os.statvfs(self.fname)
371 except OSError as e:
372 # On AtheOS, glibc always returns ENOSYS
373 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200374 self.skipTest('os.statvfs() failed with ENOSYS')
375
Serhiy Storchakabad12572014-12-15 14:03:42 +0200376 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
377 p = pickle.dumps(result, proto)
378 self.assertIn(b'statvfs_result', p)
379 if proto < 4:
380 self.assertIn(b'cos\nstatvfs_result\n', p)
381 unpickled = pickle.loads(p)
382 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200383
Serhiy Storchaka43767632013-11-03 21:31:38 +0200384 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
385 def test_1686475(self):
386 # Verify that an open file can be stat'ed
387 try:
388 os.stat(r"c:\pagefile.sys")
389 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600390 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200391 except OSError as e:
392 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000393
Serhiy Storchaka43767632013-11-03 21:31:38 +0200394 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
395 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
396 def test_15261(self):
397 # Verify that stat'ing a closed fd does not cause crash
398 r, w = os.pipe()
399 try:
400 os.stat(r) # should not raise error
401 finally:
402 os.close(r)
403 os.close(w)
404 with self.assertRaises(OSError) as ctx:
405 os.stat(r)
406 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100407
Zachary Ware63f277b2014-06-19 09:46:37 -0500408 def check_file_attributes(self, result):
409 self.assertTrue(hasattr(result, 'st_file_attributes'))
410 self.assertTrue(isinstance(result.st_file_attributes, int))
411 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
412
413 @unittest.skipUnless(sys.platform == "win32",
414 "st_file_attributes is Win32 specific")
415 def test_file_attributes(self):
416 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
417 result = os.stat(self.fname)
418 self.check_file_attributes(result)
419 self.assertEqual(
420 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
421 0)
422
423 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200424 dirname = support.TESTFN + "dir"
425 os.mkdir(dirname)
426 self.addCleanup(os.rmdir, dirname)
427
428 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500429 self.check_file_attributes(result)
430 self.assertEqual(
431 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
432 stat.FILE_ATTRIBUTE_DIRECTORY)
433
Victor Stinner47aacc82015-06-12 17:26:23 +0200434
435class UtimeTests(unittest.TestCase):
436 def setUp(self):
437 self.dirname = support.TESTFN
438 self.fname = os.path.join(self.dirname, "f1")
439
440 self.addCleanup(support.rmtree, self.dirname)
441 os.mkdir(self.dirname)
442 with open(self.fname, 'wb') as fp:
443 fp.write(b"ABC")
444
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200445 def restore_float_times(state):
446 with warnings.catch_warnings():
447 warnings.simplefilter("ignore", DeprecationWarning)
448
449 os.stat_float_times(state)
450
Victor Stinner47aacc82015-06-12 17:26:23 +0200451 # ensure that st_atime and st_mtime are float
452 with warnings.catch_warnings():
453 warnings.simplefilter("ignore", DeprecationWarning)
454
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200455 old_float_times = os.stat_float_times(-1)
456 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200457
458 os.stat_float_times(True)
459
460 def support_subsecond(self, filename):
461 # Heuristic to check if the filesystem supports timestamp with
462 # subsecond resolution: check if float and int timestamps are different
463 st = os.stat(filename)
464 return ((st.st_atime != st[7])
465 or (st.st_mtime != st[8])
466 or (st.st_ctime != st[9]))
467
468 def _test_utime(self, set_time, filename=None):
469 if not filename:
470 filename = self.fname
471
472 support_subsecond = self.support_subsecond(filename)
473 if support_subsecond:
474 # Timestamp with a resolution of 1 microsecond (10^-6).
475 #
476 # The resolution of the C internal function used by os.utime()
477 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
478 # test with a resolution of 1 ns requires more work:
479 # see the issue #15745.
480 atime_ns = 1002003000 # 1.002003 seconds
481 mtime_ns = 4005006000 # 4.005006 seconds
482 else:
483 # use a resolution of 1 second
484 atime_ns = 5 * 10**9
485 mtime_ns = 8 * 10**9
486
487 set_time(filename, (atime_ns, mtime_ns))
488 st = os.stat(filename)
489
490 if support_subsecond:
491 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
492 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
493 else:
494 self.assertEqual(st.st_atime, atime_ns * 1e-9)
495 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
496 self.assertEqual(st.st_atime_ns, atime_ns)
497 self.assertEqual(st.st_mtime_ns, mtime_ns)
498
499 def test_utime(self):
500 def set_time(filename, ns):
501 # test the ns keyword parameter
502 os.utime(filename, ns=ns)
503 self._test_utime(set_time)
504
505 @staticmethod
506 def ns_to_sec(ns):
507 # Convert a number of nanosecond (int) to a number of seconds (float).
508 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
509 # issue, os.utime() rounds towards minus infinity.
510 return (ns * 1e-9) + 0.5e-9
511
512 def test_utime_by_indexed(self):
513 # pass times as floating point seconds as the second indexed parameter
514 def set_time(filename, ns):
515 atime_ns, mtime_ns = ns
516 atime = self.ns_to_sec(atime_ns)
517 mtime = self.ns_to_sec(mtime_ns)
518 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
519 # or utime(time_t)
520 os.utime(filename, (atime, mtime))
521 self._test_utime(set_time)
522
523 def test_utime_by_times(self):
524 def set_time(filename, ns):
525 atime_ns, mtime_ns = ns
526 atime = self.ns_to_sec(atime_ns)
527 mtime = self.ns_to_sec(mtime_ns)
528 # test the times keyword parameter
529 os.utime(filename, times=(atime, mtime))
530 self._test_utime(set_time)
531
532 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
533 "follow_symlinks support for utime required "
534 "for this test.")
535 def test_utime_nofollow_symlinks(self):
536 def set_time(filename, ns):
537 # use follow_symlinks=False to test utimensat(timespec)
538 # or lutimes(timeval)
539 os.utime(filename, ns=ns, follow_symlinks=False)
540 self._test_utime(set_time)
541
542 @unittest.skipUnless(os.utime in os.supports_fd,
543 "fd support for utime required for this test.")
544 def test_utime_fd(self):
545 def set_time(filename, ns):
546 with open(filename, 'wb') as fp:
547 # use a file descriptor to test futimens(timespec)
548 # or futimes(timeval)
549 os.utime(fp.fileno(), ns=ns)
550 self._test_utime(set_time)
551
552 @unittest.skipUnless(os.utime in os.supports_dir_fd,
553 "dir_fd support for utime required for this test.")
554 def test_utime_dir_fd(self):
555 def set_time(filename, ns):
556 dirname, name = os.path.split(filename)
557 dirfd = os.open(dirname, os.O_RDONLY)
558 try:
559 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
560 os.utime(name, dir_fd=dirfd, ns=ns)
561 finally:
562 os.close(dirfd)
563 self._test_utime(set_time)
564
565 def test_utime_directory(self):
566 def set_time(filename, ns):
567 # test calling os.utime() on a directory
568 os.utime(filename, ns=ns)
569 self._test_utime(set_time, filename=self.dirname)
570
571 def _test_utime_current(self, set_time):
572 # Get the system clock
573 current = time.time()
574
575 # Call os.utime() to set the timestamp to the current system clock
576 set_time(self.fname)
577
578 if not self.support_subsecond(self.fname):
579 delta = 1.0
580 else:
581 # On Windows, the usual resolution of time.time() is 15.6 ms
582 delta = 0.020
583 st = os.stat(self.fname)
584 msg = ("st_time=%r, current=%r, dt=%r"
585 % (st.st_mtime, current, st.st_mtime - current))
586 self.assertAlmostEqual(st.st_mtime, current,
587 delta=delta, msg=msg)
588
589 def test_utime_current(self):
590 def set_time(filename):
591 # Set to the current time in the new way
592 os.utime(self.fname)
593 self._test_utime_current(set_time)
594
595 def test_utime_current_old(self):
596 def set_time(filename):
597 # Set to the current time in the old explicit way.
598 os.utime(self.fname, None)
599 self._test_utime_current(set_time)
600
601 def get_file_system(self, path):
602 if sys.platform == 'win32':
603 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
604 import ctypes
605 kernel32 = ctypes.windll.kernel32
606 buf = ctypes.create_unicode_buffer("", 100)
607 ok = kernel32.GetVolumeInformationW(root, None, 0,
608 None, None, None,
609 buf, len(buf))
610 if ok:
611 return buf.value
612 # return None if the filesystem is unknown
613
614 def test_large_time(self):
615 # Many filesystems are limited to the year 2038. At least, the test
616 # pass with NTFS filesystem.
617 if self.get_file_system(self.dirname) != "NTFS":
618 self.skipTest("requires NTFS")
619
620 large = 5000000000 # some day in 2128
621 os.utime(self.fname, (large, large))
622 self.assertEqual(os.stat(self.fname).st_mtime, large)
623
624 def test_utime_invalid_arguments(self):
625 # seconds and nanoseconds parameters are mutually exclusive
626 with self.assertRaises(ValueError):
627 os.utime(self.fname, (5, 5), ns=(5, 5))
628
629
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000630from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000631
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000632class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000633 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000634 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000635
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000636 def setUp(self):
637 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000638 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000639 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000640 for key, value in self._reference().items():
641 os.environ[key] = value
642
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000643 def tearDown(self):
644 os.environ.clear()
645 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000646 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000647 os.environb.clear()
648 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000649
Christian Heimes90333392007-11-01 19:08:42 +0000650 def _reference(self):
651 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
652
653 def _empty_mapping(self):
654 os.environ.clear()
655 return os.environ
656
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000657 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300658 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000659 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000660 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300661 os.environ.update(HELLO="World")
662 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
663 value = popen.read().strip()
664 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000665
Ezio Melottic7e139b2012-09-26 20:01:34 +0300666 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000667 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300668 with os.popen(
669 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
670 it = iter(popen)
671 self.assertEqual(next(it), "line1\n")
672 self.assertEqual(next(it), "line2\n")
673 self.assertEqual(next(it), "line3\n")
674 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000675
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000676 # Verify environ keys and values from the OS are of the
677 # correct str type.
678 def test_keyvalue_types(self):
679 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000680 self.assertEqual(type(key), str)
681 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000682
Christian Heimes90333392007-11-01 19:08:42 +0000683 def test_items(self):
684 for key, value in self._reference().items():
685 self.assertEqual(os.environ.get(key), value)
686
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000687 # Issue 7310
688 def test___repr__(self):
689 """Check that the repr() of os.environ looks like environ({...})."""
690 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000691 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
692 '{!r}: {!r}'.format(key, value)
693 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000694
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000695 def test_get_exec_path(self):
696 defpath_list = os.defpath.split(os.pathsep)
697 test_path = ['/monty', '/python', '', '/flying/circus']
698 test_env = {'PATH': os.pathsep.join(test_path)}
699
700 saved_environ = os.environ
701 try:
702 os.environ = dict(test_env)
703 # Test that defaulting to os.environ works.
704 self.assertSequenceEqual(test_path, os.get_exec_path())
705 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
706 finally:
707 os.environ = saved_environ
708
709 # No PATH environment variable
710 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
711 # Empty PATH environment variable
712 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
713 # Supplied PATH environment variable
714 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
715
Victor Stinnerb745a742010-05-18 17:17:23 +0000716 if os.supports_bytes_environ:
717 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000718 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000719 # ignore BytesWarning warning
720 with warnings.catch_warnings(record=True):
721 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000722 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000723 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000724 pass
725 else:
726 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000727
728 # bytes key and/or value
729 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
730 ['abc'])
731 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
732 ['abc'])
733 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
734 ['abc'])
735
736 @unittest.skipUnless(os.supports_bytes_environ,
737 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000738 def test_environb(self):
739 # os.environ -> os.environb
740 value = 'euro\u20ac'
741 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000742 value_bytes = value.encode(sys.getfilesystemencoding(),
743 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000744 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000745 msg = "U+20AC character is not encodable to %s" % (
746 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000747 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000748 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000749 self.assertEqual(os.environ['unicode'], value)
750 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000751
752 # os.environb -> os.environ
753 value = b'\xff'
754 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000755 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000756 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000757 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000758
Charles-François Natali2966f102011-11-26 11:32:46 +0100759 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
760 # #13415).
761 @support.requires_freebsd_version(7)
762 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100763 def test_unset_error(self):
764 if sys.platform == "win32":
765 # an environment variable is limited to 32,767 characters
766 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100767 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100768 else:
769 # "=" is not allowed in a variable name
770 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100771 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100772
Victor Stinner6d101392013-04-14 16:35:04 +0200773 def test_key_type(self):
774 missing = 'missingkey'
775 self.assertNotIn(missing, os.environ)
776
Victor Stinner839e5ea2013-04-14 16:43:03 +0200777 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200778 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200779 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200780 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200781
Victor Stinner839e5ea2013-04-14 16:43:03 +0200782 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200783 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200784 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200785 self.assertTrue(cm.exception.__suppress_context__)
786
Victor Stinner6d101392013-04-14 16:35:04 +0200787
Tim Petersc4e09402003-04-25 07:11:48 +0000788class WalkTests(unittest.TestCase):
789 """Tests for os.walk()."""
790
Victor Stinner0561c532015-03-12 10:28:24 +0100791 # Wrapper to hide minor differences between os.walk and os.fwalk
792 # to tests both functions with the same code base
793 def walk(self, directory, topdown=True, follow_symlinks=False):
794 walk_it = os.walk(directory,
795 topdown=topdown,
796 followlinks=follow_symlinks)
797 for root, dirs, files in walk_it:
798 yield (root, dirs, files)
799
Charles-François Natali7372b062012-02-05 15:15:38 +0100800 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100801 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000802
803 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000804 # TESTFN/
805 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000806 # tmp1
807 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000808 # tmp2
809 # SUB11/ no kids
810 # SUB2/ a file kid and a dirsymlink kid
811 # tmp3
812 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200813 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000814 # TEST2/
815 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100816 self.walk_path = join(support.TESTFN, "TEST1")
817 self.sub1_path = join(self.walk_path, "SUB1")
818 self.sub11_path = join(self.sub1_path, "SUB11")
819 sub2_path = join(self.walk_path, "SUB2")
820 tmp1_path = join(self.walk_path, "tmp1")
821 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000822 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100823 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000824 t2_path = join(support.TESTFN, "TEST2")
825 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200826 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000827
828 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100829 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000830 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000831 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100832
Guido van Rossumd8faa362007-04-27 19:54:29 +0000833 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000834 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000835 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
836 f.close()
837
Victor Stinner0561c532015-03-12 10:28:24 +0100838 if support.can_symlink():
839 os.symlink(os.path.abspath(t2_path), self.link_path)
840 os.symlink('broken', broken_link_path, True)
841 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
842 else:
843 self.sub2_tree = (sub2_path, [], ["tmp3"])
844
845 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000846 # Walk top-down.
Victor Stinner0561c532015-03-12 10:28:24 +0100847 all = list(os.walk(self.walk_path))
848
Tim Petersc4e09402003-04-25 07:11:48 +0000849 self.assertEqual(len(all), 4)
850 # We can't know which order SUB1 and SUB2 will appear in.
851 # Not flipped: TESTFN, SUB1, SUB11, SUB2
852 # flipped: TESTFN, SUB2, SUB1, SUB11
853 flipped = all[0][1][0] != "SUB1"
854 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200855 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100856 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
857 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
858 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
859 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000860
Victor Stinner0561c532015-03-12 10:28:24 +0100861 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000862 # Prune the search.
863 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100864 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000865 all.append((root, dirs, files))
866 # Don't descend into SUB1.
867 if 'SUB1' in dirs:
868 # Note that this also mutates the dirs we appended to all!
869 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000870
Victor Stinner0561c532015-03-12 10:28:24 +0100871 self.assertEqual(len(all), 2)
872 self.assertEqual(all[0],
873 (self.walk_path, ["SUB2"], ["tmp1"]))
874
875 all[1][-1].sort()
876 self.assertEqual(all[1], self.sub2_tree)
877
878 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000879 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100880 all = list(self.walk(self.walk_path, topdown=False))
881
Tim Petersc4e09402003-04-25 07:11:48 +0000882 self.assertEqual(len(all), 4)
883 # We can't know which order SUB1 and SUB2 will appear in.
884 # Not flipped: SUB11, SUB1, SUB2, TESTFN
885 # flipped: SUB2, SUB11, SUB1, TESTFN
886 flipped = all[3][1][0] != "SUB1"
887 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200888 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100889 self.assertEqual(all[3],
890 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
891 self.assertEqual(all[flipped],
892 (self.sub11_path, [], []))
893 self.assertEqual(all[flipped + 1],
894 (self.sub1_path, ["SUB11"], ["tmp2"]))
895 self.assertEqual(all[2 - 2 * flipped],
896 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000897
Victor Stinner0561c532015-03-12 10:28:24 +0100898 def test_walk_symlink(self):
899 if not support.can_symlink():
900 self.skipTest("need symlink support")
901
902 # Walk, following symlinks.
903 walk_it = self.walk(self.walk_path, follow_symlinks=True)
904 for root, dirs, files in walk_it:
905 if root == self.link_path:
906 self.assertEqual(dirs, [])
907 self.assertEqual(files, ["tmp4"])
908 break
909 else:
910 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000911
912 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000913 # Tear everything down. This is a decent use for bottom-up on
914 # Windows, which doesn't have a recursive delete command. The
915 # (not so) subtlety is that rmdir will fail unless the dir's
916 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000917 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000918 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000919 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000920 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000921 dirname = os.path.join(root, name)
922 if not os.path.islink(dirname):
923 os.rmdir(dirname)
924 else:
925 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000926 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000927
Charles-François Natali7372b062012-02-05 15:15:38 +0100928
929@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
930class FwalkTests(WalkTests):
931 """Tests for os.fwalk()."""
932
Victor Stinner0561c532015-03-12 10:28:24 +0100933 def walk(self, directory, topdown=True, follow_symlinks=False):
934 walk_it = os.fwalk(directory,
935 topdown=topdown,
936 follow_symlinks=follow_symlinks)
937 for root, dirs, files, root_fd in walk_it:
938 yield (root, dirs, files)
939
940
Larry Hastingsc48fe982012-06-25 04:49:05 -0700941 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
942 """
943 compare with walk() results.
944 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700945 walk_kwargs = walk_kwargs.copy()
946 fwalk_kwargs = fwalk_kwargs.copy()
947 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
948 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
949 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700950
Charles-François Natali7372b062012-02-05 15:15:38 +0100951 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700952 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100953 expected[root] = (set(dirs), set(files))
954
Larry Hastingsc48fe982012-06-25 04:49:05 -0700955 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100956 self.assertIn(root, expected)
957 self.assertEqual(expected[root], (set(dirs), set(files)))
958
Larry Hastingsc48fe982012-06-25 04:49:05 -0700959 def test_compare_to_walk(self):
960 kwargs = {'top': support.TESTFN}
961 self._compare_to_walk(kwargs, kwargs)
962
Charles-François Natali7372b062012-02-05 15:15:38 +0100963 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700964 try:
965 fd = os.open(".", os.O_RDONLY)
966 walk_kwargs = {'top': support.TESTFN}
967 fwalk_kwargs = walk_kwargs.copy()
968 fwalk_kwargs['dir_fd'] = fd
969 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
970 finally:
971 os.close(fd)
972
973 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100974 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700975 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
976 args = support.TESTFN, topdown, None
977 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100978 # check that the FD is valid
979 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700980 # redundant check
981 os.stat(rootfd)
982 # check that listdir() returns consistent information
983 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100984
985 def test_fd_leak(self):
986 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
987 # we both check that calling fwalk() a large number of times doesn't
988 # yield EMFILE, and that the minimum allocated FD hasn't changed.
989 minfd = os.dup(1)
990 os.close(minfd)
991 for i in range(256):
992 for x in os.fwalk(support.TESTFN):
993 pass
994 newfd = os.dup(1)
995 self.addCleanup(os.close, newfd)
996 self.assertEqual(newfd, minfd)
997
998 def tearDown(self):
999 # cleanup
1000 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
1001 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001002 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001003 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001004 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -07001005 if stat.S_ISDIR(st.st_mode):
1006 os.rmdir(name, dir_fd=rootfd)
1007 else:
1008 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001009 os.rmdir(support.TESTFN)
1010
1011
Guido van Rossume7ba4952007-06-06 23:52:48 +00001012class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001013 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001014 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001015
1016 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001017 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001018 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1019 os.makedirs(path) # Should work
1020 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1021 os.makedirs(path)
1022
1023 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001024 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001025 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1026 os.makedirs(path)
1027 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1028 'dir5', 'dir6')
1029 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001030
Terry Reedy5a22b652010-12-02 07:05:56 +00001031 def test_exist_ok_existing_directory(self):
1032 path = os.path.join(support.TESTFN, 'dir1')
1033 mode = 0o777
1034 old_mask = os.umask(0o022)
1035 os.makedirs(path, mode)
1036 self.assertRaises(OSError, os.makedirs, path, mode)
1037 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001038 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001039 os.makedirs(path, mode=mode, exist_ok=True)
1040 os.umask(old_mask)
1041
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001042 def test_exist_ok_s_isgid_directory(self):
1043 path = os.path.join(support.TESTFN, 'dir1')
1044 S_ISGID = stat.S_ISGID
1045 mode = 0o777
1046 old_mask = os.umask(0o022)
1047 try:
1048 existing_testfn_mode = stat.S_IMODE(
1049 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001050 try:
1051 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001052 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001053 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001054 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1055 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1056 # The os should apply S_ISGID from the parent dir for us, but
1057 # this test need not depend on that behavior. Be explicit.
1058 os.makedirs(path, mode | S_ISGID)
1059 # http://bugs.python.org/issue14992
1060 # Should not fail when the bit is already set.
1061 os.makedirs(path, mode, exist_ok=True)
1062 # remove the bit.
1063 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001064 # May work even when the bit is not already set when demanded.
1065 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001066 finally:
1067 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001068
1069 def test_exist_ok_existing_regular_file(self):
1070 base = support.TESTFN
1071 path = os.path.join(support.TESTFN, 'dir1')
1072 f = open(path, 'w')
1073 f.write('abc')
1074 f.close()
1075 self.assertRaises(OSError, os.makedirs, path)
1076 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1077 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1078 os.remove(path)
1079
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001080 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001081 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001082 'dir4', 'dir5', 'dir6')
1083 # If the tests failed, the bottom-most directory ('../dir6')
1084 # may not have been created, so we look for the outermost directory
1085 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001086 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001087 path = os.path.dirname(path)
1088
1089 os.removedirs(path)
1090
Andrew Svetlov405faed2012-12-25 12:18:09 +02001091
R David Murrayf2ad1732014-12-25 18:36:56 -05001092@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1093class ChownFileTests(unittest.TestCase):
1094
Berker Peksag036a71b2015-07-21 09:29:48 +03001095 @classmethod
1096 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001097 os.mkdir(support.TESTFN)
1098
1099 def test_chown_uid_gid_arguments_must_be_index(self):
1100 stat = os.stat(support.TESTFN)
1101 uid = stat.st_uid
1102 gid = stat.st_gid
1103 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1104 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1105 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1106 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1107 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1108
1109 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1110 def test_chown(self):
1111 gid_1, gid_2 = groups[:2]
1112 uid = os.stat(support.TESTFN).st_uid
1113 os.chown(support.TESTFN, uid, gid_1)
1114 gid = os.stat(support.TESTFN).st_gid
1115 self.assertEqual(gid, gid_1)
1116 os.chown(support.TESTFN, uid, gid_2)
1117 gid = os.stat(support.TESTFN).st_gid
1118 self.assertEqual(gid, gid_2)
1119
1120 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1121 "test needs root privilege and more than one user")
1122 def test_chown_with_root(self):
1123 uid_1, uid_2 = all_users[:2]
1124 gid = os.stat(support.TESTFN).st_gid
1125 os.chown(support.TESTFN, uid_1, gid)
1126 uid = os.stat(support.TESTFN).st_uid
1127 self.assertEqual(uid, uid_1)
1128 os.chown(support.TESTFN, uid_2, gid)
1129 uid = os.stat(support.TESTFN).st_uid
1130 self.assertEqual(uid, uid_2)
1131
1132 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1133 "test needs non-root account and more than one user")
1134 def test_chown_without_permission(self):
1135 uid_1, uid_2 = all_users[:2]
1136 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001137 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001138 os.chown(support.TESTFN, uid_1, gid)
1139 os.chown(support.TESTFN, uid_2, gid)
1140
Berker Peksag036a71b2015-07-21 09:29:48 +03001141 @classmethod
1142 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001143 os.rmdir(support.TESTFN)
1144
1145
Andrew Svetlov405faed2012-12-25 12:18:09 +02001146class RemoveDirsTests(unittest.TestCase):
1147 def setUp(self):
1148 os.makedirs(support.TESTFN)
1149
1150 def tearDown(self):
1151 support.rmtree(support.TESTFN)
1152
1153 def test_remove_all(self):
1154 dira = os.path.join(support.TESTFN, 'dira')
1155 os.mkdir(dira)
1156 dirb = os.path.join(dira, 'dirb')
1157 os.mkdir(dirb)
1158 os.removedirs(dirb)
1159 self.assertFalse(os.path.exists(dirb))
1160 self.assertFalse(os.path.exists(dira))
1161 self.assertFalse(os.path.exists(support.TESTFN))
1162
1163 def test_remove_partial(self):
1164 dira = os.path.join(support.TESTFN, 'dira')
1165 os.mkdir(dira)
1166 dirb = os.path.join(dira, 'dirb')
1167 os.mkdir(dirb)
1168 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1169 f.write('text')
1170 os.removedirs(dirb)
1171 self.assertFalse(os.path.exists(dirb))
1172 self.assertTrue(os.path.exists(dira))
1173 self.assertTrue(os.path.exists(support.TESTFN))
1174
1175 def test_remove_nothing(self):
1176 dira = os.path.join(support.TESTFN, 'dira')
1177 os.mkdir(dira)
1178 dirb = os.path.join(dira, 'dirb')
1179 os.mkdir(dirb)
1180 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1181 f.write('text')
1182 with self.assertRaises(OSError):
1183 os.removedirs(dirb)
1184 self.assertTrue(os.path.exists(dirb))
1185 self.assertTrue(os.path.exists(dira))
1186 self.assertTrue(os.path.exists(support.TESTFN))
1187
1188
Guido van Rossume7ba4952007-06-06 23:52:48 +00001189class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001190 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001191 with open(os.devnull, 'wb') as f:
1192 f.write(b'hello')
1193 f.close()
1194 with open(os.devnull, 'rb') as f:
1195 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001196
Andrew Svetlov405faed2012-12-25 12:18:09 +02001197
Guido van Rossume7ba4952007-06-06 23:52:48 +00001198class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001199 def test_urandom_length(self):
1200 self.assertEqual(len(os.urandom(0)), 0)
1201 self.assertEqual(len(os.urandom(1)), 1)
1202 self.assertEqual(len(os.urandom(10)), 10)
1203 self.assertEqual(len(os.urandom(100)), 100)
1204 self.assertEqual(len(os.urandom(1000)), 1000)
1205
1206 def test_urandom_value(self):
1207 data1 = os.urandom(16)
1208 data2 = os.urandom(16)
1209 self.assertNotEqual(data1, data2)
1210
1211 def get_urandom_subprocess(self, count):
1212 code = '\n'.join((
1213 'import os, sys',
1214 'data = os.urandom(%s)' % count,
1215 'sys.stdout.buffer.write(data)',
1216 'sys.stdout.buffer.flush()'))
1217 out = assert_python_ok('-c', code)
1218 stdout = out[1]
1219 self.assertEqual(len(stdout), 16)
1220 return stdout
1221
1222 def test_urandom_subprocess(self):
1223 data1 = self.get_urandom_subprocess(16)
1224 data2 = self.get_urandom_subprocess(16)
1225 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001226
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001227
1228HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
Victor Stinner9eb57c52015-03-19 22:21:49 +01001229HAVE_GETRANDOM = (sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001230
1231@unittest.skipIf(HAVE_GETENTROPY,
1232 "getentropy() does not use a file descriptor")
Victor Stinner9eb57c52015-03-19 22:21:49 +01001233@unittest.skipIf(HAVE_GETRANDOM,
1234 "getrandom() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001235class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001236 @unittest.skipUnless(resource, "test requires the resource module")
1237 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001238 # Check urandom() failing when it is not able to open /dev/random.
1239 # We spawn a new process to make the test more robust (if getrlimit()
1240 # failed to restore the file descriptor limit after this, the whole
1241 # test suite would crash; this actually happened on the OS X Tiger
1242 # buildbot).
1243 code = """if 1:
1244 import errno
1245 import os
1246 import resource
1247
1248 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1249 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1250 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001251 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001252 except OSError as e:
1253 assert e.errno == errno.EMFILE, e.errno
1254 else:
1255 raise AssertionError("OSError not raised")
1256 """
1257 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001258
Antoine Pitroue472aea2014-04-26 14:33:03 +02001259 def test_urandom_fd_closed(self):
1260 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1261 # closed.
1262 code = """if 1:
1263 import os
1264 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001265 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001266 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001267 with test.support.SuppressCrashReport():
1268 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001269 sys.stdout.buffer.write(os.urandom(4))
1270 """
1271 rc, out, err = assert_python_ok('-Sc', code)
1272
1273 def test_urandom_fd_reopened(self):
1274 # Issue #21207: urandom() should detect its fd to /dev/urandom
1275 # changed to something else, and reopen it.
1276 with open(support.TESTFN, 'wb') as f:
1277 f.write(b"x" * 256)
1278 self.addCleanup(os.unlink, support.TESTFN)
1279 code = """if 1:
1280 import os
1281 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001282 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001283 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001284 with test.support.SuppressCrashReport():
1285 for fd in range(3, 256):
1286 try:
1287 os.close(fd)
1288 except OSError:
1289 pass
1290 else:
1291 # Found the urandom fd (XXX hopefully)
1292 break
1293 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001294 with open({TESTFN!r}, 'rb') as f:
1295 os.dup2(f.fileno(), fd)
1296 sys.stdout.buffer.write(os.urandom(4))
1297 sys.stdout.buffer.write(os.urandom(4))
1298 """.format(TESTFN=support.TESTFN)
1299 rc, out, err = assert_python_ok('-Sc', code)
1300 self.assertEqual(len(out), 8)
1301 self.assertNotEqual(out[0:4], out[4:8])
1302 rc, out2, err2 = assert_python_ok('-Sc', code)
1303 self.assertEqual(len(out2), 8)
1304 self.assertNotEqual(out2, out)
1305
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001306
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001307@contextlib.contextmanager
1308def _execvpe_mockup(defpath=None):
1309 """
1310 Stubs out execv and execve functions when used as context manager.
1311 Records exec calls. The mock execv and execve functions always raise an
1312 exception as they would normally never return.
1313 """
1314 # A list of tuples containing (function name, first arg, args)
1315 # of calls to execv or execve that have been made.
1316 calls = []
1317
1318 def mock_execv(name, *args):
1319 calls.append(('execv', name, args))
1320 raise RuntimeError("execv called")
1321
1322 def mock_execve(name, *args):
1323 calls.append(('execve', name, args))
1324 raise OSError(errno.ENOTDIR, "execve called")
1325
1326 try:
1327 orig_execv = os.execv
1328 orig_execve = os.execve
1329 orig_defpath = os.defpath
1330 os.execv = mock_execv
1331 os.execve = mock_execve
1332 if defpath is not None:
1333 os.defpath = defpath
1334 yield calls
1335 finally:
1336 os.execv = orig_execv
1337 os.execve = orig_execve
1338 os.defpath = orig_defpath
1339
Guido van Rossume7ba4952007-06-06 23:52:48 +00001340class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001341 @unittest.skipIf(USING_LINUXTHREADS,
1342 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001343 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001344 self.assertRaises(OSError, os.execvpe, 'no such app-',
1345 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001346
Thomas Heller6790d602007-08-30 17:15:14 +00001347 def test_execvpe_with_bad_arglist(self):
1348 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1349
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001350 @unittest.skipUnless(hasattr(os, '_execvpe'),
1351 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001352 def _test_internal_execvpe(self, test_type):
1353 program_path = os.sep + 'absolutepath'
1354 if test_type is bytes:
1355 program = b'executable'
1356 fullpath = os.path.join(os.fsencode(program_path), program)
1357 native_fullpath = fullpath
1358 arguments = [b'progname', 'arg1', 'arg2']
1359 else:
1360 program = 'executable'
1361 arguments = ['progname', 'arg1', 'arg2']
1362 fullpath = os.path.join(program_path, program)
1363 if os.name != "nt":
1364 native_fullpath = os.fsencode(fullpath)
1365 else:
1366 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001367 env = {'spam': 'beans'}
1368
Victor Stinnerb745a742010-05-18 17:17:23 +00001369 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001370 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001371 self.assertRaises(RuntimeError,
1372 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001373 self.assertEqual(len(calls), 1)
1374 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1375
Victor Stinnerb745a742010-05-18 17:17:23 +00001376 # test os._execvpe() with a relative path:
1377 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001378 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001379 self.assertRaises(OSError,
1380 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001381 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001382 self.assertSequenceEqual(calls[0],
1383 ('execve', native_fullpath, (arguments, env)))
1384
1385 # test os._execvpe() with a relative path:
1386 # os.get_exec_path() reads the 'PATH' variable
1387 with _execvpe_mockup() as calls:
1388 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001389 if test_type is bytes:
1390 env_path[b'PATH'] = program_path
1391 else:
1392 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001393 self.assertRaises(OSError,
1394 os._execvpe, program, arguments, env=env_path)
1395 self.assertEqual(len(calls), 1)
1396 self.assertSequenceEqual(calls[0],
1397 ('execve', native_fullpath, (arguments, env_path)))
1398
1399 def test_internal_execvpe_str(self):
1400 self._test_internal_execvpe(str)
1401 if os.name != "nt":
1402 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001403
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001404
Serhiy Storchaka43767632013-11-03 21:31:38 +02001405@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001406class Win32ErrorTests(unittest.TestCase):
1407 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001408 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001409
1410 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001411 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001412
1413 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001414 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001415
1416 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001417 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001418 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001419 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001420 finally:
1421 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001422 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001423
1424 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001425 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001426
Thomas Wouters477c8d52006-05-27 19:21:47 +00001427 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001428 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001429
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001430class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001431 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001432 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1433 #singles.append("close")
1434 #We omit close because it doesn'r raise an exception on some platforms
1435 def get_single(f):
1436 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001437 if hasattr(os, f):
1438 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001439 return helper
1440 for f in singles:
1441 locals()["test_"+f] = get_single(f)
1442
Benjamin Peterson7522c742009-01-19 21:00:09 +00001443 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001444 try:
1445 f(support.make_bad_fd(), *args)
1446 except OSError as e:
1447 self.assertEqual(e.errno, errno.EBADF)
1448 else:
1449 self.fail("%r didn't raise a OSError with a bad file descriptor"
1450 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001451
Serhiy Storchaka43767632013-11-03 21:31:38 +02001452 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001453 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001454 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001455
Serhiy Storchaka43767632013-11-03 21:31:38 +02001456 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001457 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001458 fd = support.make_bad_fd()
1459 # Make sure none of the descriptors we are about to close are
1460 # currently valid (issue 6542).
1461 for i in range(10):
1462 try: os.fstat(fd+i)
1463 except OSError:
1464 pass
1465 else:
1466 break
1467 if i < 2:
1468 raise unittest.SkipTest(
1469 "Unable to acquire a range of invalid file descriptors")
1470 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001471
Serhiy Storchaka43767632013-11-03 21:31:38 +02001472 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001473 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001474 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001475
Serhiy Storchaka43767632013-11-03 21:31:38 +02001476 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001477 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001478 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001479
Serhiy Storchaka43767632013-11-03 21:31:38 +02001480 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001481 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001482 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001483
Serhiy Storchaka43767632013-11-03 21:31:38 +02001484 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001485 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001486 self.check(os.pathconf, "PC_NAME_MAX")
1487 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001488
Serhiy Storchaka43767632013-11-03 21:31:38 +02001489 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001490 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001491 self.check(os.truncate, 0)
1492 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001493
Serhiy Storchaka43767632013-11-03 21:31:38 +02001494 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001495 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001496 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001497
Serhiy Storchaka43767632013-11-03 21:31:38 +02001498 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001499 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001500 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001501
Victor Stinner57ddf782014-01-08 15:21:28 +01001502 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1503 def test_readv(self):
1504 buf = bytearray(10)
1505 self.check(os.readv, [buf])
1506
Serhiy Storchaka43767632013-11-03 21:31:38 +02001507 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001508 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001509 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001510
Serhiy Storchaka43767632013-11-03 21:31:38 +02001511 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001512 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001513 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001514
Victor Stinner57ddf782014-01-08 15:21:28 +01001515 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1516 def test_writev(self):
1517 self.check(os.writev, [b'abc'])
1518
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001519 def test_inheritable(self):
1520 self.check(os.get_inheritable)
1521 self.check(os.set_inheritable, True)
1522
1523 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1524 'needs os.get_blocking() and os.set_blocking()')
1525 def test_blocking(self):
1526 self.check(os.get_blocking)
1527 self.check(os.set_blocking, True)
1528
Brian Curtin1b9df392010-11-24 20:24:31 +00001529
1530class LinkTests(unittest.TestCase):
1531 def setUp(self):
1532 self.file1 = support.TESTFN
1533 self.file2 = os.path.join(support.TESTFN + "2")
1534
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001535 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001536 for file in (self.file1, self.file2):
1537 if os.path.exists(file):
1538 os.unlink(file)
1539
Brian Curtin1b9df392010-11-24 20:24:31 +00001540 def _test_link(self, file1, file2):
1541 with open(file1, "w") as f1:
1542 f1.write("test")
1543
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001544 with warnings.catch_warnings():
1545 warnings.simplefilter("ignore", DeprecationWarning)
1546 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001547 with open(file1, "r") as f1, open(file2, "r") as f2:
1548 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1549
1550 def test_link(self):
1551 self._test_link(self.file1, self.file2)
1552
1553 def test_link_bytes(self):
1554 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1555 bytes(self.file2, sys.getfilesystemencoding()))
1556
Brian Curtinf498b752010-11-30 15:54:04 +00001557 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001558 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001559 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001560 except UnicodeError:
1561 raise unittest.SkipTest("Unable to encode for this platform.")
1562
Brian Curtinf498b752010-11-30 15:54:04 +00001563 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001564 self.file2 = self.file1 + "2"
1565 self._test_link(self.file1, self.file2)
1566
Serhiy Storchaka43767632013-11-03 21:31:38 +02001567@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1568class PosixUidGidTests(unittest.TestCase):
1569 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1570 def test_setuid(self):
1571 if os.getuid() != 0:
1572 self.assertRaises(OSError, os.setuid, 0)
1573 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001574
Serhiy Storchaka43767632013-11-03 21:31:38 +02001575 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1576 def test_setgid(self):
1577 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1578 self.assertRaises(OSError, os.setgid, 0)
1579 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001580
Serhiy Storchaka43767632013-11-03 21:31:38 +02001581 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1582 def test_seteuid(self):
1583 if os.getuid() != 0:
1584 self.assertRaises(OSError, os.seteuid, 0)
1585 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001586
Serhiy Storchaka43767632013-11-03 21:31:38 +02001587 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1588 def test_setegid(self):
1589 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1590 self.assertRaises(OSError, os.setegid, 0)
1591 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001592
Serhiy Storchaka43767632013-11-03 21:31:38 +02001593 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1594 def test_setreuid(self):
1595 if os.getuid() != 0:
1596 self.assertRaises(OSError, os.setreuid, 0, 0)
1597 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1598 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001599
Serhiy Storchaka43767632013-11-03 21:31:38 +02001600 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1601 def test_setreuid_neg1(self):
1602 # Needs to accept -1. We run this in a subprocess to avoid
1603 # altering the test runner's process state (issue8045).
1604 subprocess.check_call([
1605 sys.executable, '-c',
1606 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001607
Serhiy Storchaka43767632013-11-03 21:31:38 +02001608 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1609 def test_setregid(self):
1610 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1611 self.assertRaises(OSError, os.setregid, 0, 0)
1612 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1613 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001614
Serhiy Storchaka43767632013-11-03 21:31:38 +02001615 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1616 def test_setregid_neg1(self):
1617 # Needs to accept -1. We run this in a subprocess to avoid
1618 # altering the test runner's process state (issue8045).
1619 subprocess.check_call([
1620 sys.executable, '-c',
1621 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001622
Serhiy Storchaka43767632013-11-03 21:31:38 +02001623@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1624class Pep383Tests(unittest.TestCase):
1625 def setUp(self):
1626 if support.TESTFN_UNENCODABLE:
1627 self.dir = support.TESTFN_UNENCODABLE
1628 elif support.TESTFN_NONASCII:
1629 self.dir = support.TESTFN_NONASCII
1630 else:
1631 self.dir = support.TESTFN
1632 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001633
Serhiy Storchaka43767632013-11-03 21:31:38 +02001634 bytesfn = []
1635 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001636 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001637 fn = os.fsencode(fn)
1638 except UnicodeEncodeError:
1639 return
1640 bytesfn.append(fn)
1641 add_filename(support.TESTFN_UNICODE)
1642 if support.TESTFN_UNENCODABLE:
1643 add_filename(support.TESTFN_UNENCODABLE)
1644 if support.TESTFN_NONASCII:
1645 add_filename(support.TESTFN_NONASCII)
1646 if not bytesfn:
1647 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001648
Serhiy Storchaka43767632013-11-03 21:31:38 +02001649 self.unicodefn = set()
1650 os.mkdir(self.dir)
1651 try:
1652 for fn in bytesfn:
1653 support.create_empty_file(os.path.join(self.bdir, fn))
1654 fn = os.fsdecode(fn)
1655 if fn in self.unicodefn:
1656 raise ValueError("duplicate filename")
1657 self.unicodefn.add(fn)
1658 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001659 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001660 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001661
Serhiy Storchaka43767632013-11-03 21:31:38 +02001662 def tearDown(self):
1663 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001664
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 def test_listdir(self):
1666 expected = self.unicodefn
1667 found = set(os.listdir(self.dir))
1668 self.assertEqual(found, expected)
1669 # test listdir without arguments
1670 current_directory = os.getcwd()
1671 try:
1672 os.chdir(os.sep)
1673 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1674 finally:
1675 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001676
Serhiy Storchaka43767632013-11-03 21:31:38 +02001677 def test_open(self):
1678 for fn in self.unicodefn:
1679 f = open(os.path.join(self.dir, fn), 'rb')
1680 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001681
Serhiy Storchaka43767632013-11-03 21:31:38 +02001682 @unittest.skipUnless(hasattr(os, 'statvfs'),
1683 "need os.statvfs()")
1684 def test_statvfs(self):
1685 # issue #9645
1686 for fn in self.unicodefn:
1687 # should not fail with file not found error
1688 fullname = os.path.join(self.dir, fn)
1689 os.statvfs(fullname)
1690
1691 def test_stat(self):
1692 for fn in self.unicodefn:
1693 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001694
Brian Curtineb24d742010-04-12 17:16:38 +00001695@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1696class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001697 def _kill(self, sig):
1698 # Start sys.executable as a subprocess and communicate from the
1699 # subprocess to the parent that the interpreter is ready. When it
1700 # becomes ready, send *sig* via os.kill to the subprocess and check
1701 # that the return code is equal to *sig*.
1702 import ctypes
1703 from ctypes import wintypes
1704 import msvcrt
1705
1706 # Since we can't access the contents of the process' stdout until the
1707 # process has exited, use PeekNamedPipe to see what's inside stdout
1708 # without waiting. This is done so we can tell that the interpreter
1709 # is started and running at a point where it could handle a signal.
1710 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1711 PeekNamedPipe.restype = wintypes.BOOL
1712 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1713 ctypes.POINTER(ctypes.c_char), # stdout buf
1714 wintypes.DWORD, # Buffer size
1715 ctypes.POINTER(wintypes.DWORD), # bytes read
1716 ctypes.POINTER(wintypes.DWORD), # bytes avail
1717 ctypes.POINTER(wintypes.DWORD)) # bytes left
1718 msg = "running"
1719 proc = subprocess.Popen([sys.executable, "-c",
1720 "import sys;"
1721 "sys.stdout.write('{}');"
1722 "sys.stdout.flush();"
1723 "input()".format(msg)],
1724 stdout=subprocess.PIPE,
1725 stderr=subprocess.PIPE,
1726 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001727 self.addCleanup(proc.stdout.close)
1728 self.addCleanup(proc.stderr.close)
1729 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001730
1731 count, max = 0, 100
1732 while count < max and proc.poll() is None:
1733 # Create a string buffer to store the result of stdout from the pipe
1734 buf = ctypes.create_string_buffer(len(msg))
1735 # Obtain the text currently in proc.stdout
1736 # Bytes read/avail/left are left as NULL and unused
1737 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1738 buf, ctypes.sizeof(buf), None, None, None)
1739 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1740 if buf.value:
1741 self.assertEqual(msg, buf.value.decode())
1742 break
1743 time.sleep(0.1)
1744 count += 1
1745 else:
1746 self.fail("Did not receive communication from the subprocess")
1747
Brian Curtineb24d742010-04-12 17:16:38 +00001748 os.kill(proc.pid, sig)
1749 self.assertEqual(proc.wait(), sig)
1750
1751 def test_kill_sigterm(self):
1752 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001753 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001754
1755 def test_kill_int(self):
1756 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001757 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001758
1759 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001760 tagname = "test_os_%s" % uuid.uuid1()
1761 m = mmap.mmap(-1, 1, tagname)
1762 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001763 # Run a script which has console control handling enabled.
1764 proc = subprocess.Popen([sys.executable,
1765 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001766 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001767 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1768 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001769 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001770 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001771 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001772 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001773 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001774 count += 1
1775 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001776 # Forcefully kill the process if we weren't able to signal it.
1777 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001778 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001779 os.kill(proc.pid, event)
1780 # proc.send_signal(event) could also be done here.
1781 # Allow time for the signal to be passed and the process to exit.
1782 time.sleep(0.5)
1783 if not proc.poll():
1784 # Forcefully kill the process if we weren't able to signal it.
1785 os.kill(proc.pid, signal.SIGINT)
1786 self.fail("subprocess did not stop on {}".format(name))
1787
1788 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1789 def test_CTRL_C_EVENT(self):
1790 from ctypes import wintypes
1791 import ctypes
1792
1793 # Make a NULL value by creating a pointer with no argument.
1794 NULL = ctypes.POINTER(ctypes.c_int)()
1795 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1796 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1797 wintypes.BOOL)
1798 SetConsoleCtrlHandler.restype = wintypes.BOOL
1799
1800 # Calling this with NULL and FALSE causes the calling process to
1801 # handle CTRL+C, rather than ignore it. This property is inherited
1802 # by subprocesses.
1803 SetConsoleCtrlHandler(NULL, 0)
1804
1805 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1806
1807 def test_CTRL_BREAK_EVENT(self):
1808 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1809
1810
Brian Curtind40e6f72010-07-08 21:39:08 +00001811@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001812class Win32ListdirTests(unittest.TestCase):
1813 """Test listdir on Windows."""
1814
1815 def setUp(self):
1816 self.created_paths = []
1817 for i in range(2):
1818 dir_name = 'SUB%d' % i
1819 dir_path = os.path.join(support.TESTFN, dir_name)
1820 file_name = 'FILE%d' % i
1821 file_path = os.path.join(support.TESTFN, file_name)
1822 os.makedirs(dir_path)
1823 with open(file_path, 'w') as f:
1824 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1825 self.created_paths.extend([dir_name, file_name])
1826 self.created_paths.sort()
1827
1828 def tearDown(self):
1829 shutil.rmtree(support.TESTFN)
1830
1831 def test_listdir_no_extended_path(self):
1832 """Test when the path is not an "extended" path."""
1833 # unicode
1834 self.assertEqual(
1835 sorted(os.listdir(support.TESTFN)),
1836 self.created_paths)
1837 # bytes
1838 self.assertEqual(
1839 sorted(os.listdir(os.fsencode(support.TESTFN))),
1840 [os.fsencode(path) for path in self.created_paths])
1841
1842 def test_listdir_extended_path(self):
1843 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001844 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001845 # unicode
1846 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1847 self.assertEqual(
1848 sorted(os.listdir(path)),
1849 self.created_paths)
1850 # bytes
1851 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1852 self.assertEqual(
1853 sorted(os.listdir(path)),
1854 [os.fsencode(path) for path in self.created_paths])
1855
1856
1857@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001858@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001859class Win32SymlinkTests(unittest.TestCase):
1860 filelink = 'filelinktest'
1861 filelink_target = os.path.abspath(__file__)
1862 dirlink = 'dirlinktest'
1863 dirlink_target = os.path.dirname(filelink_target)
1864 missing_link = 'missing link'
1865
1866 def setUp(self):
1867 assert os.path.exists(self.dirlink_target)
1868 assert os.path.exists(self.filelink_target)
1869 assert not os.path.exists(self.dirlink)
1870 assert not os.path.exists(self.filelink)
1871 assert not os.path.exists(self.missing_link)
1872
1873 def tearDown(self):
1874 if os.path.exists(self.filelink):
1875 os.remove(self.filelink)
1876 if os.path.exists(self.dirlink):
1877 os.rmdir(self.dirlink)
1878 if os.path.lexists(self.missing_link):
1879 os.remove(self.missing_link)
1880
1881 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001882 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001883 self.assertTrue(os.path.exists(self.dirlink))
1884 self.assertTrue(os.path.isdir(self.dirlink))
1885 self.assertTrue(os.path.islink(self.dirlink))
1886 self.check_stat(self.dirlink, self.dirlink_target)
1887
1888 def test_file_link(self):
1889 os.symlink(self.filelink_target, self.filelink)
1890 self.assertTrue(os.path.exists(self.filelink))
1891 self.assertTrue(os.path.isfile(self.filelink))
1892 self.assertTrue(os.path.islink(self.filelink))
1893 self.check_stat(self.filelink, self.filelink_target)
1894
1895 def _create_missing_dir_link(self):
1896 'Create a "directory" link to a non-existent target'
1897 linkname = self.missing_link
1898 if os.path.lexists(linkname):
1899 os.remove(linkname)
1900 target = r'c:\\target does not exist.29r3c740'
1901 assert not os.path.exists(target)
1902 target_is_dir = True
1903 os.symlink(target, linkname, target_is_dir)
1904
1905 def test_remove_directory_link_to_missing_target(self):
1906 self._create_missing_dir_link()
1907 # For compatibility with Unix, os.remove will check the
1908 # directory status and call RemoveDirectory if the symlink
1909 # was created with target_is_dir==True.
1910 os.remove(self.missing_link)
1911
1912 @unittest.skip("currently fails; consider for improvement")
1913 def test_isdir_on_directory_link_to_missing_target(self):
1914 self._create_missing_dir_link()
1915 # consider having isdir return true for directory links
1916 self.assertTrue(os.path.isdir(self.missing_link))
1917
1918 @unittest.skip("currently fails; consider for improvement")
1919 def test_rmdir_on_directory_link_to_missing_target(self):
1920 self._create_missing_dir_link()
1921 # consider allowing rmdir to remove directory links
1922 os.rmdir(self.missing_link)
1923
1924 def check_stat(self, link, target):
1925 self.assertEqual(os.stat(link), os.stat(target))
1926 self.assertNotEqual(os.lstat(link), os.stat(link))
1927
Brian Curtind25aef52011-06-13 15:16:04 -05001928 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001929 with warnings.catch_warnings():
1930 warnings.simplefilter("ignore", DeprecationWarning)
1931 self.assertEqual(os.stat(bytes_link), os.stat(target))
1932 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001933
1934 def test_12084(self):
1935 level1 = os.path.abspath(support.TESTFN)
1936 level2 = os.path.join(level1, "level2")
1937 level3 = os.path.join(level2, "level3")
1938 try:
1939 os.mkdir(level1)
1940 os.mkdir(level2)
1941 os.mkdir(level3)
1942
1943 file1 = os.path.abspath(os.path.join(level1, "file1"))
1944
1945 with open(file1, "w") as f:
1946 f.write("file1")
1947
1948 orig_dir = os.getcwd()
1949 try:
1950 os.chdir(level2)
1951 link = os.path.join(level2, "link")
1952 os.symlink(os.path.relpath(file1), "link")
1953 self.assertIn("link", os.listdir(os.getcwd()))
1954
1955 # Check os.stat calls from the same dir as the link
1956 self.assertEqual(os.stat(file1), os.stat("link"))
1957
1958 # Check os.stat calls from a dir below the link
1959 os.chdir(level1)
1960 self.assertEqual(os.stat(file1),
1961 os.stat(os.path.relpath(link)))
1962
1963 # Check os.stat calls from a dir above the link
1964 os.chdir(level3)
1965 self.assertEqual(os.stat(file1),
1966 os.stat(os.path.relpath(link)))
1967 finally:
1968 os.chdir(orig_dir)
1969 except OSError as err:
1970 self.fail(err)
1971 finally:
1972 os.remove(file1)
1973 shutil.rmtree(level1)
1974
Brian Curtind40e6f72010-07-08 21:39:08 +00001975
Tim Golden0321cf22014-05-05 19:46:17 +01001976@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1977class Win32JunctionTests(unittest.TestCase):
1978 junction = 'junctiontest'
1979 junction_target = os.path.dirname(os.path.abspath(__file__))
1980
1981 def setUp(self):
1982 assert os.path.exists(self.junction_target)
1983 assert not os.path.exists(self.junction)
1984
1985 def tearDown(self):
1986 if os.path.exists(self.junction):
1987 # os.rmdir delegates to Windows' RemoveDirectoryW,
1988 # which removes junction points safely.
1989 os.rmdir(self.junction)
1990
1991 def test_create_junction(self):
1992 _winapi.CreateJunction(self.junction_target, self.junction)
1993 self.assertTrue(os.path.exists(self.junction))
1994 self.assertTrue(os.path.isdir(self.junction))
1995
1996 # Junctions are not recognized as links.
1997 self.assertFalse(os.path.islink(self.junction))
1998
1999 def test_unlink_removes_junction(self):
2000 _winapi.CreateJunction(self.junction_target, self.junction)
2001 self.assertTrue(os.path.exists(self.junction))
2002
2003 os.unlink(self.junction)
2004 self.assertFalse(os.path.exists(self.junction))
2005
2006
Jason R. Coombs3a092862013-05-27 23:21:28 -04002007@support.skip_unless_symlink
2008class NonLocalSymlinkTests(unittest.TestCase):
2009
2010 def setUp(self):
2011 """
2012 Create this structure:
2013
2014 base
2015 \___ some_dir
2016 """
2017 os.makedirs('base/some_dir')
2018
2019 def tearDown(self):
2020 shutil.rmtree('base')
2021
2022 def test_directory_link_nonlocal(self):
2023 """
2024 The symlink target should resolve relative to the link, not relative
2025 to the current directory.
2026
2027 Then, link base/some_link -> base/some_dir and ensure that some_link
2028 is resolved as a directory.
2029
2030 In issue13772, it was discovered that directory detection failed if
2031 the symlink target was not specified relative to the current
2032 directory, which was a defect in the implementation.
2033 """
2034 src = os.path.join('base', 'some_link')
2035 os.symlink('some_dir', src)
2036 assert os.path.isdir(src)
2037
2038
Victor Stinnere8d51452010-08-19 01:05:19 +00002039class FSEncodingTests(unittest.TestCase):
2040 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002041 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2042 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002043
Victor Stinnere8d51452010-08-19 01:05:19 +00002044 def test_identity(self):
2045 # assert fsdecode(fsencode(x)) == x
2046 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2047 try:
2048 bytesfn = os.fsencode(fn)
2049 except UnicodeEncodeError:
2050 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002051 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002052
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002053
Brett Cannonefb00c02012-02-29 18:31:31 -05002054
2055class DeviceEncodingTests(unittest.TestCase):
2056
2057 def test_bad_fd(self):
2058 # Return None when an fd doesn't actually exist.
2059 self.assertIsNone(os.device_encoding(123456))
2060
Philip Jenveye308b7c2012-02-29 16:16:15 -08002061 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2062 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002063 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002064 def test_device_encoding(self):
2065 encoding = os.device_encoding(0)
2066 self.assertIsNotNone(encoding)
2067 self.assertTrue(codecs.lookup(encoding))
2068
2069
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002070class PidTests(unittest.TestCase):
2071 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2072 def test_getppid(self):
2073 p = subprocess.Popen([sys.executable, '-c',
2074 'import os; print(os.getppid())'],
2075 stdout=subprocess.PIPE)
2076 stdout, _ = p.communicate()
2077 # We are the parent of our subprocess
2078 self.assertEqual(int(stdout), os.getpid())
2079
2080
Brian Curtin0151b8e2010-09-24 13:43:43 +00002081# The introduction of this TestCase caused at least two different errors on
2082# *nix buildbots. Temporarily skip this to let the buildbots move along.
2083@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002084@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2085class LoginTests(unittest.TestCase):
2086 def test_getlogin(self):
2087 user_name = os.getlogin()
2088 self.assertNotEqual(len(user_name), 0)
2089
2090
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002091@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2092 "needs os.getpriority and os.setpriority")
2093class ProgramPriorityTests(unittest.TestCase):
2094 """Tests for os.getpriority() and os.setpriority()."""
2095
2096 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002097
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002098 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2099 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2100 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002101 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2102 if base >= 19 and new_prio <= 19:
2103 raise unittest.SkipTest(
2104 "unable to reliably test setpriority at current nice level of %s" % base)
2105 else:
2106 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002107 finally:
2108 try:
2109 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2110 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002111 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002112 raise
2113
2114
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002115if threading is not None:
2116 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002117
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002118 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002119
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002120 def __init__(self, conn):
2121 asynchat.async_chat.__init__(self, conn)
2122 self.in_buffer = []
2123 self.closed = False
2124 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002125
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002126 def handle_read(self):
2127 data = self.recv(4096)
2128 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002129
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002130 def get_data(self):
2131 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002132
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002133 def handle_close(self):
2134 self.close()
2135 self.closed = True
2136
2137 def handle_error(self):
2138 raise
2139
2140 def __init__(self, address):
2141 threading.Thread.__init__(self)
2142 asyncore.dispatcher.__init__(self)
2143 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2144 self.bind(address)
2145 self.listen(5)
2146 self.host, self.port = self.socket.getsockname()[:2]
2147 self.handler_instance = None
2148 self._active = False
2149 self._active_lock = threading.Lock()
2150
2151 # --- public API
2152
2153 @property
2154 def running(self):
2155 return self._active
2156
2157 def start(self):
2158 assert not self.running
2159 self.__flag = threading.Event()
2160 threading.Thread.start(self)
2161 self.__flag.wait()
2162
2163 def stop(self):
2164 assert self.running
2165 self._active = False
2166 self.join()
2167
2168 def wait(self):
2169 # wait for handler connection to be closed, then stop the server
2170 while not getattr(self.handler_instance, "closed", False):
2171 time.sleep(0.001)
2172 self.stop()
2173
2174 # --- internals
2175
2176 def run(self):
2177 self._active = True
2178 self.__flag.set()
2179 while self._active and asyncore.socket_map:
2180 self._active_lock.acquire()
2181 asyncore.loop(timeout=0.001, count=1)
2182 self._active_lock.release()
2183 asyncore.close_all()
2184
2185 def handle_accept(self):
2186 conn, addr = self.accept()
2187 self.handler_instance = self.Handler(conn)
2188
2189 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002190 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002191 handle_read = handle_connect
2192
2193 def writable(self):
2194 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002195
2196 def handle_error(self):
2197 raise
2198
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002199
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002200@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002201@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2202class TestSendfile(unittest.TestCase):
2203
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002204 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002205 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002206 not sys.platform.startswith("solaris") and \
2207 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002208 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2209 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002210
2211 @classmethod
2212 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002213 cls.key = support.threading_setup()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002214 with open(support.TESTFN, "wb") as f:
2215 f.write(cls.DATA)
2216
2217 @classmethod
2218 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002219 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002220 support.unlink(support.TESTFN)
2221
2222 def setUp(self):
2223 self.server = SendfileTestServer((support.HOST, 0))
2224 self.server.start()
2225 self.client = socket.socket()
2226 self.client.connect((self.server.host, self.server.port))
2227 self.client.settimeout(1)
2228 # synchronize by waiting for "220 ready" response
2229 self.client.recv(1024)
2230 self.sockno = self.client.fileno()
2231 self.file = open(support.TESTFN, 'rb')
2232 self.fileno = self.file.fileno()
2233
2234 def tearDown(self):
2235 self.file.close()
2236 self.client.close()
2237 if self.server.running:
2238 self.server.stop()
2239
2240 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2241 """A higher level wrapper representing how an application is
2242 supposed to use sendfile().
2243 """
2244 while 1:
2245 try:
2246 if self.SUPPORT_HEADERS_TRAILERS:
2247 return os.sendfile(sock, file, offset, nbytes, headers,
2248 trailers)
2249 else:
2250 return os.sendfile(sock, file, offset, nbytes)
2251 except OSError as err:
2252 if err.errno == errno.ECONNRESET:
2253 # disconnected
2254 raise
2255 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2256 # we have to retry send data
2257 continue
2258 else:
2259 raise
2260
2261 def test_send_whole_file(self):
2262 # normal send
2263 total_sent = 0
2264 offset = 0
2265 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002266 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002267 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2268 if sent == 0:
2269 break
2270 offset += sent
2271 total_sent += sent
2272 self.assertTrue(sent <= nbytes)
2273 self.assertEqual(offset, total_sent)
2274
2275 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002276 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002277 self.client.close()
2278 self.server.wait()
2279 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002280 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002281 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002282
2283 def test_send_at_certain_offset(self):
2284 # start sending a file at a certain offset
2285 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002286 offset = len(self.DATA) // 2
2287 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002288 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002289 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002290 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2291 if sent == 0:
2292 break
2293 offset += sent
2294 total_sent += sent
2295 self.assertTrue(sent <= nbytes)
2296
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002297 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002298 self.client.close()
2299 self.server.wait()
2300 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002301 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002302 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002303 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002304 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002305
2306 def test_offset_overflow(self):
2307 # specify an offset > file size
2308 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002309 try:
2310 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2311 except OSError as e:
2312 # Solaris can raise EINVAL if offset >= file length, ignore.
2313 if e.errno != errno.EINVAL:
2314 raise
2315 else:
2316 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002317 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002318 self.client.close()
2319 self.server.wait()
2320 data = self.server.handler_instance.get_data()
2321 self.assertEqual(data, b'')
2322
2323 def test_invalid_offset(self):
2324 with self.assertRaises(OSError) as cm:
2325 os.sendfile(self.sockno, self.fileno, -1, 4096)
2326 self.assertEqual(cm.exception.errno, errno.EINVAL)
2327
Martin Panterbf19d162015-09-09 01:01:13 +00002328 def test_keywords(self):
2329 # Keyword arguments should be supported
2330 os.sendfile(out=self.sockno, offset=0, count=4096,
2331 **{'in': self.fileno})
2332 if self.SUPPORT_HEADERS_TRAILERS:
2333 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
2334 headers=None, trailers=None, flags=0)
2335
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002336 # --- headers / trailers tests
2337
Serhiy Storchaka43767632013-11-03 21:31:38 +02002338 @requires_headers_trailers
2339 def test_headers(self):
2340 total_sent = 0
2341 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2342 headers=[b"x" * 512])
2343 total_sent += sent
2344 offset = 4096
2345 nbytes = 4096
2346 while 1:
2347 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2348 offset, nbytes)
2349 if sent == 0:
2350 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002351 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002352 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002353
Serhiy Storchaka43767632013-11-03 21:31:38 +02002354 expected_data = b"x" * 512 + self.DATA
2355 self.assertEqual(total_sent, len(expected_data))
2356 self.client.close()
2357 self.server.wait()
2358 data = self.server.handler_instance.get_data()
2359 self.assertEqual(hash(data), hash(expected_data))
2360
2361 @requires_headers_trailers
2362 def test_trailers(self):
2363 TESTFN2 = support.TESTFN + "2"
2364 file_data = b"abcdef"
2365 with open(TESTFN2, 'wb') as f:
2366 f.write(file_data)
2367 with open(TESTFN2, 'rb')as f:
2368 self.addCleanup(os.remove, TESTFN2)
2369 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2370 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002371 self.client.close()
2372 self.server.wait()
2373 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002374 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002375
Serhiy Storchaka43767632013-11-03 21:31:38 +02002376 @requires_headers_trailers
2377 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2378 'test needs os.SF_NODISKIO')
2379 def test_flags(self):
2380 try:
2381 os.sendfile(self.sockno, self.fileno, 0, 4096,
2382 flags=os.SF_NODISKIO)
2383 except OSError as err:
2384 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2385 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002386
2387
Larry Hastings9cf065c2012-06-22 16:30:09 -07002388def supports_extended_attributes():
2389 if not hasattr(os, "setxattr"):
2390 return False
2391 try:
2392 with open(support.TESTFN, "wb") as fp:
2393 try:
2394 os.setxattr(fp.fileno(), b"user.test", b"")
2395 except OSError:
2396 return False
2397 finally:
2398 support.unlink(support.TESTFN)
2399 # Kernels < 2.6.39 don't respect setxattr flags.
2400 kernel_version = platform.release()
2401 m = re.match("2.6.(\d{1,2})", kernel_version)
2402 return m is None or int(m.group(1)) >= 39
2403
2404
2405@unittest.skipUnless(supports_extended_attributes(),
2406 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002407class ExtendedAttributeTests(unittest.TestCase):
2408
2409 def tearDown(self):
2410 support.unlink(support.TESTFN)
2411
Larry Hastings9cf065c2012-06-22 16:30:09 -07002412 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002413 fn = support.TESTFN
2414 open(fn, "wb").close()
2415 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002416 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002417 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002418 init_xattr = listxattr(fn)
2419 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002420 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002421 xattr = set(init_xattr)
2422 xattr.add("user.test")
2423 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002424 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2425 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2426 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002427 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002428 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002429 self.assertEqual(cm.exception.errno, errno.EEXIST)
2430 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002431 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002432 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002433 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002434 xattr.add("user.test2")
2435 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002436 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002437 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002438 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002439 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002440 xattr.remove("user.test")
2441 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002442 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2443 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2444 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2445 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002446 many = sorted("user.test{}".format(i) for i in range(100))
2447 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002448 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002449 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002450
Larry Hastings9cf065c2012-06-22 16:30:09 -07002451 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002452 def make_bytes(s):
2453 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002454 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002455 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002456 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002457
2458 def test_simple(self):
2459 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2460 os.listxattr)
2461
2462 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002463 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2464 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002465
2466 def test_fds(self):
2467 def getxattr(path, *args):
2468 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002469 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002470 def setxattr(path, *args):
2471 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002472 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002473 def removexattr(path, *args):
2474 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002475 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002476 def listxattr(path, *args):
2477 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002478 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002479 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2480
2481
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002482@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2483class Win32DeprecatedBytesAPI(unittest.TestCase):
2484 def test_deprecated(self):
2485 import nt
2486 filename = os.fsencode(support.TESTFN)
2487 with warnings.catch_warnings():
2488 warnings.simplefilter("error", DeprecationWarning)
2489 for func, *args in (
2490 (nt._getfullpathname, filename),
2491 (nt._isdir, filename),
2492 (os.access, filename, os.R_OK),
2493 (os.chdir, filename),
2494 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002495 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002496 (os.link, filename, filename),
2497 (os.listdir, filename),
2498 (os.lstat, filename),
2499 (os.mkdir, filename),
2500 (os.open, filename, os.O_RDONLY),
2501 (os.rename, filename, filename),
2502 (os.rmdir, filename),
2503 (os.startfile, filename),
2504 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002505 (os.unlink, filename),
2506 (os.utime, filename),
2507 ):
2508 self.assertRaises(DeprecationWarning, func, *args)
2509
Victor Stinner28216442011-11-16 00:34:44 +01002510 @support.skip_unless_symlink
2511 def test_symlink(self):
2512 filename = os.fsencode(support.TESTFN)
2513 with warnings.catch_warnings():
2514 warnings.simplefilter("error", DeprecationWarning)
2515 self.assertRaises(DeprecationWarning,
2516 os.symlink, filename, filename)
2517
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002518
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002519@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2520class TermsizeTests(unittest.TestCase):
2521 def test_does_not_crash(self):
2522 """Check if get_terminal_size() returns a meaningful value.
2523
2524 There's no easy portable way to actually check the size of the
2525 terminal, so let's check if it returns something sensible instead.
2526 """
2527 try:
2528 size = os.get_terminal_size()
2529 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002530 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002531 # Under win32 a generic OSError can be thrown if the
2532 # handle cannot be retrieved
2533 self.skipTest("failed to query terminal size")
2534 raise
2535
Antoine Pitroucfade362012-02-08 23:48:59 +01002536 self.assertGreaterEqual(size.columns, 0)
2537 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002538
2539 def test_stty_match(self):
2540 """Check if stty returns the same results
2541
2542 stty actually tests stdin, so get_terminal_size is invoked on
2543 stdin explicitly. If stty succeeded, then get_terminal_size()
2544 should work too.
2545 """
2546 try:
2547 size = subprocess.check_output(['stty', 'size']).decode().split()
2548 except (FileNotFoundError, subprocess.CalledProcessError):
2549 self.skipTest("stty invocation failed")
2550 expected = (int(size[1]), int(size[0])) # reversed order
2551
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002552 try:
2553 actual = os.get_terminal_size(sys.__stdin__.fileno())
2554 except OSError as e:
2555 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2556 # Under win32 a generic OSError can be thrown if the
2557 # handle cannot be retrieved
2558 self.skipTest("failed to query terminal size")
2559 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002560 self.assertEqual(expected, actual)
2561
2562
Victor Stinner292c8352012-10-30 02:17:38 +01002563class OSErrorTests(unittest.TestCase):
2564 def setUp(self):
2565 class Str(str):
2566 pass
2567
Victor Stinnerafe17062012-10-31 22:47:43 +01002568 self.bytes_filenames = []
2569 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002570 if support.TESTFN_UNENCODABLE is not None:
2571 decoded = support.TESTFN_UNENCODABLE
2572 else:
2573 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002574 self.unicode_filenames.append(decoded)
2575 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002576 if support.TESTFN_UNDECODABLE is not None:
2577 encoded = support.TESTFN_UNDECODABLE
2578 else:
2579 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002580 self.bytes_filenames.append(encoded)
2581 self.bytes_filenames.append(memoryview(encoded))
2582
2583 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002584
2585 def test_oserror_filename(self):
2586 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002587 (self.filenames, os.chdir,),
2588 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002589 (self.filenames, os.lstat,),
2590 (self.filenames, os.open, os.O_RDONLY),
2591 (self.filenames, os.rmdir,),
2592 (self.filenames, os.stat,),
2593 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002594 ]
2595 if sys.platform == "win32":
2596 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002597 (self.bytes_filenames, os.rename, b"dst"),
2598 (self.bytes_filenames, os.replace, b"dst"),
2599 (self.unicode_filenames, os.rename, "dst"),
2600 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002601 # Issue #16414: Don't test undecodable names with listdir()
2602 # because of a Windows bug.
2603 #
2604 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2605 # empty list (instead of failing), whereas os.listdir(b'\xff')
2606 # raises a FileNotFoundError. It looks like a Windows bug:
2607 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2608 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2609 # ERROR_PATH_NOT_FOUND (3).
2610 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002611 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002612 else:
2613 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002614 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002615 (self.filenames, os.rename, "dst"),
2616 (self.filenames, os.replace, "dst"),
2617 ))
2618 if hasattr(os, "chown"):
2619 funcs.append((self.filenames, os.chown, 0, 0))
2620 if hasattr(os, "lchown"):
2621 funcs.append((self.filenames, os.lchown, 0, 0))
2622 if hasattr(os, "truncate"):
2623 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002624 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002625 funcs.append((self.filenames, os.chflags, 0))
2626 if hasattr(os, "lchflags"):
2627 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002628 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002629 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002630 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002631 if sys.platform == "win32":
2632 funcs.append((self.bytes_filenames, os.link, b"dst"))
2633 funcs.append((self.unicode_filenames, os.link, "dst"))
2634 else:
2635 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002636 if hasattr(os, "listxattr"):
2637 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002638 (self.filenames, os.listxattr,),
2639 (self.filenames, os.getxattr, "user.test"),
2640 (self.filenames, os.setxattr, "user.test", b'user'),
2641 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002642 ))
2643 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002644 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002645 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002646 if sys.platform == "win32":
2647 funcs.append((self.unicode_filenames, os.readlink,))
2648 else:
2649 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002650
Victor Stinnerafe17062012-10-31 22:47:43 +01002651 for filenames, func, *func_args in funcs:
2652 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002653 try:
2654 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002655 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002656 self.assertIs(err.filename, name)
2657 else:
2658 self.fail("No exception thrown by {}".format(func))
2659
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002660class CPUCountTests(unittest.TestCase):
2661 def test_cpu_count(self):
2662 cpus = os.cpu_count()
2663 if cpus is not None:
2664 self.assertIsInstance(cpus, int)
2665 self.assertGreater(cpus, 0)
2666 else:
2667 self.skipTest("Could not determine the number of CPUs")
2668
Victor Stinnerdaf45552013-08-28 00:53:59 +02002669
2670class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002671 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002672 fd = os.open(__file__, os.O_RDONLY)
2673 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002674 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002675
Victor Stinnerdaf45552013-08-28 00:53:59 +02002676 os.set_inheritable(fd, True)
2677 self.assertEqual(os.get_inheritable(fd), True)
2678
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002679 @unittest.skipIf(fcntl is None, "need fcntl")
2680 def test_get_inheritable_cloexec(self):
2681 fd = os.open(__file__, os.O_RDONLY)
2682 self.addCleanup(os.close, fd)
2683 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002684
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002685 # clear FD_CLOEXEC flag
2686 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2687 flags &= ~fcntl.FD_CLOEXEC
2688 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002689
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002690 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002691
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002692 @unittest.skipIf(fcntl is None, "need fcntl")
2693 def test_set_inheritable_cloexec(self):
2694 fd = os.open(__file__, os.O_RDONLY)
2695 self.addCleanup(os.close, fd)
2696 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2697 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002698
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002699 os.set_inheritable(fd, True)
2700 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2701 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002702
Victor Stinnerdaf45552013-08-28 00:53:59 +02002703 def test_open(self):
2704 fd = os.open(__file__, os.O_RDONLY)
2705 self.addCleanup(os.close, fd)
2706 self.assertEqual(os.get_inheritable(fd), False)
2707
2708 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2709 def test_pipe(self):
2710 rfd, wfd = os.pipe()
2711 self.addCleanup(os.close, rfd)
2712 self.addCleanup(os.close, wfd)
2713 self.assertEqual(os.get_inheritable(rfd), False)
2714 self.assertEqual(os.get_inheritable(wfd), False)
2715
2716 def test_dup(self):
2717 fd1 = os.open(__file__, os.O_RDONLY)
2718 self.addCleanup(os.close, fd1)
2719
2720 fd2 = os.dup(fd1)
2721 self.addCleanup(os.close, fd2)
2722 self.assertEqual(os.get_inheritable(fd2), False)
2723
2724 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2725 def test_dup2(self):
2726 fd = os.open(__file__, os.O_RDONLY)
2727 self.addCleanup(os.close, fd)
2728
2729 # inheritable by default
2730 fd2 = os.open(__file__, os.O_RDONLY)
2731 try:
2732 os.dup2(fd, fd2)
2733 self.assertEqual(os.get_inheritable(fd2), True)
2734 finally:
2735 os.close(fd2)
2736
2737 # force non-inheritable
2738 fd3 = os.open(__file__, os.O_RDONLY)
2739 try:
2740 os.dup2(fd, fd3, inheritable=False)
2741 self.assertEqual(os.get_inheritable(fd3), False)
2742 finally:
2743 os.close(fd3)
2744
2745 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2746 def test_openpty(self):
2747 master_fd, slave_fd = os.openpty()
2748 self.addCleanup(os.close, master_fd)
2749 self.addCleanup(os.close, slave_fd)
2750 self.assertEqual(os.get_inheritable(master_fd), False)
2751 self.assertEqual(os.get_inheritable(slave_fd), False)
2752
2753
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002754@unittest.skipUnless(hasattr(os, 'get_blocking'),
2755 'needs os.get_blocking() and os.set_blocking()')
2756class BlockingTests(unittest.TestCase):
2757 def test_blocking(self):
2758 fd = os.open(__file__, os.O_RDONLY)
2759 self.addCleanup(os.close, fd)
2760 self.assertEqual(os.get_blocking(fd), True)
2761
2762 os.set_blocking(fd, False)
2763 self.assertEqual(os.get_blocking(fd), False)
2764
2765 os.set_blocking(fd, True)
2766 self.assertEqual(os.get_blocking(fd), True)
2767
2768
Yury Selivanov97e2e062014-09-26 12:33:06 -04002769
2770class ExportsTests(unittest.TestCase):
2771 def test_os_all(self):
2772 self.assertIn('open', os.__all__)
2773 self.assertIn('walk', os.__all__)
2774
2775
Victor Stinner6036e442015-03-08 01:58:04 +01002776class TestScandir(unittest.TestCase):
2777 def setUp(self):
2778 self.path = os.path.realpath(support.TESTFN)
2779 self.addCleanup(support.rmtree, self.path)
2780 os.mkdir(self.path)
2781
2782 def create_file(self, name="file.txt"):
2783 filename = os.path.join(self.path, name)
2784 with open(filename, "wb") as fp:
2785 fp.write(b'python')
2786 return filename
2787
2788 def get_entries(self, names):
2789 entries = dict((entry.name, entry)
2790 for entry in os.scandir(self.path))
2791 self.assertEqual(sorted(entries.keys()), names)
2792 return entries
2793
2794 def assert_stat_equal(self, stat1, stat2, skip_fields):
2795 if skip_fields:
2796 for attr in dir(stat1):
2797 if not attr.startswith("st_"):
2798 continue
2799 if attr in ("st_dev", "st_ino", "st_nlink"):
2800 continue
2801 self.assertEqual(getattr(stat1, attr),
2802 getattr(stat2, attr),
2803 (stat1, stat2, attr))
2804 else:
2805 self.assertEqual(stat1, stat2)
2806
2807 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2808 self.assertEqual(entry.name, name)
2809 self.assertEqual(entry.path, os.path.join(self.path, name))
2810 self.assertEqual(entry.inode(),
2811 os.stat(entry.path, follow_symlinks=False).st_ino)
2812
2813 entry_stat = os.stat(entry.path)
2814 self.assertEqual(entry.is_dir(),
2815 stat.S_ISDIR(entry_stat.st_mode))
2816 self.assertEqual(entry.is_file(),
2817 stat.S_ISREG(entry_stat.st_mode))
2818 self.assertEqual(entry.is_symlink(),
2819 os.path.islink(entry.path))
2820
2821 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2822 self.assertEqual(entry.is_dir(follow_symlinks=False),
2823 stat.S_ISDIR(entry_lstat.st_mode))
2824 self.assertEqual(entry.is_file(follow_symlinks=False),
2825 stat.S_ISREG(entry_lstat.st_mode))
2826
2827 self.assert_stat_equal(entry.stat(),
2828 entry_stat,
2829 os.name == 'nt' and not is_symlink)
2830 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2831 entry_lstat,
2832 os.name == 'nt')
2833
2834 def test_attributes(self):
2835 link = hasattr(os, 'link')
2836 symlink = support.can_symlink()
2837
2838 dirname = os.path.join(self.path, "dir")
2839 os.mkdir(dirname)
2840 filename = self.create_file("file.txt")
2841 if link:
2842 os.link(filename, os.path.join(self.path, "link_file.txt"))
2843 if symlink:
2844 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2845 target_is_directory=True)
2846 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2847
2848 names = ['dir', 'file.txt']
2849 if link:
2850 names.append('link_file.txt')
2851 if symlink:
2852 names.extend(('symlink_dir', 'symlink_file.txt'))
2853 entries = self.get_entries(names)
2854
2855 entry = entries['dir']
2856 self.check_entry(entry, 'dir', True, False, False)
2857
2858 entry = entries['file.txt']
2859 self.check_entry(entry, 'file.txt', False, True, False)
2860
2861 if link:
2862 entry = entries['link_file.txt']
2863 self.check_entry(entry, 'link_file.txt', False, True, False)
2864
2865 if symlink:
2866 entry = entries['symlink_dir']
2867 self.check_entry(entry, 'symlink_dir', True, False, True)
2868
2869 entry = entries['symlink_file.txt']
2870 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2871
2872 def get_entry(self, name):
2873 entries = list(os.scandir(self.path))
2874 self.assertEqual(len(entries), 1)
2875
2876 entry = entries[0]
2877 self.assertEqual(entry.name, name)
2878 return entry
2879
2880 def create_file_entry(self):
2881 filename = self.create_file()
2882 return self.get_entry(os.path.basename(filename))
2883
2884 def test_current_directory(self):
2885 filename = self.create_file()
2886 old_dir = os.getcwd()
2887 try:
2888 os.chdir(self.path)
2889
2890 # call scandir() without parameter: it must list the content
2891 # of the current directory
2892 entries = dict((entry.name, entry) for entry in os.scandir())
2893 self.assertEqual(sorted(entries.keys()),
2894 [os.path.basename(filename)])
2895 finally:
2896 os.chdir(old_dir)
2897
2898 def test_repr(self):
2899 entry = self.create_file_entry()
2900 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2901
2902 def test_removed_dir(self):
2903 path = os.path.join(self.path, 'dir')
2904
2905 os.mkdir(path)
2906 entry = self.get_entry('dir')
2907 os.rmdir(path)
2908
2909 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2910 if os.name == 'nt':
2911 self.assertTrue(entry.is_dir())
2912 self.assertFalse(entry.is_file())
2913 self.assertFalse(entry.is_symlink())
2914 if os.name == 'nt':
2915 self.assertRaises(FileNotFoundError, entry.inode)
2916 # don't fail
2917 entry.stat()
2918 entry.stat(follow_symlinks=False)
2919 else:
2920 self.assertGreater(entry.inode(), 0)
2921 self.assertRaises(FileNotFoundError, entry.stat)
2922 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2923
2924 def test_removed_file(self):
2925 entry = self.create_file_entry()
2926 os.unlink(entry.path)
2927
2928 self.assertFalse(entry.is_dir())
2929 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2930 if os.name == 'nt':
2931 self.assertTrue(entry.is_file())
2932 self.assertFalse(entry.is_symlink())
2933 if os.name == 'nt':
2934 self.assertRaises(FileNotFoundError, entry.inode)
2935 # don't fail
2936 entry.stat()
2937 entry.stat(follow_symlinks=False)
2938 else:
2939 self.assertGreater(entry.inode(), 0)
2940 self.assertRaises(FileNotFoundError, entry.stat)
2941 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2942
2943 def test_broken_symlink(self):
2944 if not support.can_symlink():
2945 return self.skipTest('cannot create symbolic link')
2946
2947 filename = self.create_file("file.txt")
2948 os.symlink(filename,
2949 os.path.join(self.path, "symlink.txt"))
2950 entries = self.get_entries(['file.txt', 'symlink.txt'])
2951 entry = entries['symlink.txt']
2952 os.unlink(filename)
2953
2954 self.assertGreater(entry.inode(), 0)
2955 self.assertFalse(entry.is_dir())
2956 self.assertFalse(entry.is_file()) # broken symlink returns False
2957 self.assertFalse(entry.is_dir(follow_symlinks=False))
2958 self.assertFalse(entry.is_file(follow_symlinks=False))
2959 self.assertTrue(entry.is_symlink())
2960 self.assertRaises(FileNotFoundError, entry.stat)
2961 # don't fail
2962 entry.stat(follow_symlinks=False)
2963
2964 def test_bytes(self):
2965 if os.name == "nt":
2966 # On Windows, os.scandir(bytes) must raise an exception
2967 self.assertRaises(TypeError, os.scandir, b'.')
2968 return
2969
2970 self.create_file("file.txt")
2971
2972 path_bytes = os.fsencode(self.path)
2973 entries = list(os.scandir(path_bytes))
2974 self.assertEqual(len(entries), 1, entries)
2975 entry = entries[0]
2976
2977 self.assertEqual(entry.name, b'file.txt')
2978 self.assertEqual(entry.path,
2979 os.fsencode(os.path.join(self.path, 'file.txt')))
2980
2981 def test_empty_path(self):
2982 self.assertRaises(FileNotFoundError, os.scandir, '')
2983
2984 def test_consume_iterator_twice(self):
2985 self.create_file("file.txt")
2986 iterator = os.scandir(self.path)
2987
2988 entries = list(iterator)
2989 self.assertEqual(len(entries), 1, entries)
2990
2991 # check than consuming the iterator twice doesn't raise exception
2992 entries2 = list(iterator)
2993 self.assertEqual(len(entries2), 0, entries2)
2994
2995 def test_bad_path_type(self):
2996 for obj in [1234, 1.234, {}, []]:
2997 self.assertRaises(TypeError, os.scandir, obj)
2998
2999
Fred Drake2e2be372001-09-20 21:33:42 +00003000if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003001 unittest.main()