blob: 62815965e8b39ed1e8edf4f5e07488924384ed0d [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070027import decimal
28import fractions
Christian Heimes25827622013-10-12 01:27:08 +020029import pickle
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020047 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020048except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020049 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020050
Georg Brandl2daf6ae2012-02-20 19:54:16 +010051from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000052
Victor Stinner034d0aa2012-06-05 01:22:15 +020053with warnings.catch_warnings():
54 warnings.simplefilter("ignore", DeprecationWarning)
55 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010056st = os.stat(__file__)
57stat_supports_subsecond = (
58 # check if float and int timestamps are different
59 (st.st_atime != st[7])
60 or (st.st_mtime != st[8])
61 or (st.st_ctime != st[9]))
62
Mark Dickinson7cf03892010-04-16 13:45:35 +000063# Detect whether we're on a Linux system that uses the (now outdated
64# and unmaintained) linuxthreads threading library. There's an issue
65# when combining linuxthreads with a failed execv call: see
66# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020067if hasattr(sys, 'thread_info') and sys.thread_info.version:
68 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
69else:
70 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000071
Stefan Krahebee49a2013-01-17 15:31:00 +010072# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
73HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
74
Thomas Wouters0e3f5912006-08-11 14:57:12 +000075# Tests creating TESTFN
76class FileTests(unittest.TestCase):
77 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000078 if os.path.exists(support.TESTFN):
79 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000080 tearDown = setUp
81
82 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000083 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000084 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000085 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000086
Christian Heimesfdab48e2008-01-20 09:06:41 +000087 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000088 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
89 # We must allocate two consecutive file descriptors, otherwise
90 # it will mess up other file descriptors (perhaps even the three
91 # standard ones).
92 second = os.dup(first)
93 try:
94 retries = 0
95 while second != first + 1:
96 os.close(first)
97 retries += 1
98 if retries > 10:
99 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000100 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000101 first, second = second, os.dup(second)
102 finally:
103 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000104 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000105 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000106 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000107
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000108 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000109 def test_rename(self):
110 path = support.TESTFN
111 old = sys.getrefcount(path)
112 self.assertRaises(TypeError, os.rename, path, 0)
113 new = sys.getrefcount(path)
114 self.assertEqual(old, new)
115
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000116 def test_read(self):
117 with open(support.TESTFN, "w+b") as fobj:
118 fobj.write(b"spam")
119 fobj.flush()
120 fd = fobj.fileno()
121 os.lseek(fd, 0, 0)
122 s = os.read(fd, 4)
123 self.assertEqual(type(s), bytes)
124 self.assertEqual(s, b"spam")
125
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200126 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200127 # Skip the test on 32-bit platforms: the number of bytes must fit in a
128 # Py_ssize_t type
129 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
130 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200131 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
132 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200133 with open(support.TESTFN, "wb") as fp:
134 fp.write(b'test')
135 self.addCleanup(support.unlink, support.TESTFN)
136
137 # Issue #21932: Make sure that os.read() does not raise an
138 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200139 with open(support.TESTFN, "rb") as fp:
140 data = os.read(fp.fileno(), size)
141
142 # The test does not try to read more than 2 GB at once because the
143 # operating system is free to return less bytes than requested.
144 self.assertEqual(data, b'test')
145
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000146 def test_write(self):
147 # os.write() accepts bytes- and buffer-like objects but not strings
148 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
149 self.assertRaises(TypeError, os.write, fd, "beans")
150 os.write(fd, b"bacon\n")
151 os.write(fd, bytearray(b"eggs\n"))
152 os.write(fd, memoryview(b"spam\n"))
153 os.close(fd)
154 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000155 self.assertEqual(fobj.read().splitlines(),
156 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000157
Victor Stinnere0daff12011-03-20 23:36:35 +0100158 def write_windows_console(self, *args):
159 retcode = subprocess.call(args,
160 # use a new console to not flood the test output
161 creationflags=subprocess.CREATE_NEW_CONSOLE,
162 # use a shell to hide the console window (SW_HIDE)
163 shell=True)
164 self.assertEqual(retcode, 0)
165
166 @unittest.skipUnless(sys.platform == 'win32',
167 'test specific to the Windows console')
168 def test_write_windows_console(self):
169 # Issue #11395: the Windows console returns an error (12: not enough
170 # space error) on writing into stdout if stdout mode is binary and the
171 # length is greater than 66,000 bytes (or less, depending on heap
172 # usage).
173 code = "print('x' * 100000)"
174 self.write_windows_console(sys.executable, "-c", code)
175 self.write_windows_console(sys.executable, "-u", "-c", code)
176
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000177 def fdopen_helper(self, *args):
178 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200179 f = os.fdopen(fd, *args)
180 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000181
182 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200183 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
184 os.close(fd)
185
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000186 self.fdopen_helper()
187 self.fdopen_helper('r')
188 self.fdopen_helper('r', 100)
189
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100190 def test_replace(self):
191 TESTFN2 = support.TESTFN + ".2"
192 with open(support.TESTFN, 'w') as f:
193 f.write("1")
194 with open(TESTFN2, 'w') as f:
195 f.write("2")
196 self.addCleanup(os.unlink, TESTFN2)
197 os.replace(support.TESTFN, TESTFN2)
198 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
199 with open(TESTFN2, 'r') as f:
200 self.assertEqual(f.read(), "1")
201
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200202
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203# Test attributes on return values from os.*stat* family.
204class StatAttributeTests(unittest.TestCase):
205 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000206 os.mkdir(support.TESTFN)
207 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000208 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000209 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000210 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000211
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000212 def tearDown(self):
213 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000214 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000215
Serhiy Storchaka43767632013-11-03 21:31:38 +0200216 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000217 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000218 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000219
220 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000221 self.assertEqual(result[stat.ST_SIZE], 3)
222 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000223
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000224 # Make sure all the attributes are there
225 members = dir(result)
226 for name in dir(stat):
227 if name[:3] == 'ST_':
228 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000229 if name.endswith("TIME"):
230 def trunc(x): return int(x)
231 else:
232 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000233 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000235 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000236
Larry Hastings6fe20b32012-04-19 15:07:49 -0700237 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700238 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700239 for name in 'st_atime st_mtime st_ctime'.split():
240 floaty = int(getattr(result, name) * 100000)
241 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700242 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700243
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244 try:
245 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200246 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247 except IndexError:
248 pass
249
250 # Make sure that assignment fails
251 try:
252 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200253 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000254 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000255 pass
256
257 try:
258 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200259 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000260 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261 pass
262
263 try:
264 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200265 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000266 except AttributeError:
267 pass
268
269 # Use the stat_result constructor with a too-short tuple.
270 try:
271 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200272 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273 except TypeError:
274 pass
275
Ezio Melotti42da6632011-03-15 05:18:48 +0200276 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000277 try:
278 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
279 except TypeError:
280 pass
281
Antoine Pitrou38425292010-09-21 18:19:07 +0000282 def test_stat_attributes(self):
283 self.check_stat_attributes(self.fname)
284
285 def test_stat_attributes_bytes(self):
286 try:
287 fname = self.fname.encode(sys.getfilesystemencoding())
288 except UnicodeEncodeError:
289 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100290 with warnings.catch_warnings():
291 warnings.simplefilter("ignore", DeprecationWarning)
292 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000293
Christian Heimes25827622013-10-12 01:27:08 +0200294 def test_stat_result_pickle(self):
295 result = os.stat(self.fname)
296 p = pickle.dumps(result)
297 self.assertIn(b'\x03cos\nstat_result\n', p)
298 unpickled = pickle.loads(p)
299 self.assertEqual(result, unpickled)
300
Serhiy Storchaka43767632013-11-03 21:31:38 +0200301 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000302 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000303 try:
304 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000305 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000306 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000307 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200308 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000309
310 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000311 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000312
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000313 # Make sure all the attributes are there.
314 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
315 'ffree', 'favail', 'flag', 'namemax')
316 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000317 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000318
319 # Make sure that assignment really fails
320 try:
321 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200322 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000323 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000324 pass
325
326 try:
327 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200328 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000329 except AttributeError:
330 pass
331
332 # Use the constructor with a too-short tuple.
333 try:
334 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200335 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000336 except TypeError:
337 pass
338
Ezio Melotti42da6632011-03-15 05:18:48 +0200339 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000340 try:
341 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
342 except TypeError:
343 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000344
Christian Heimes25827622013-10-12 01:27:08 +0200345 @unittest.skipUnless(hasattr(os, 'statvfs'),
346 "need os.statvfs()")
347 def test_statvfs_result_pickle(self):
348 try:
349 result = os.statvfs(self.fname)
350 except OSError as e:
351 # On AtheOS, glibc always returns ENOSYS
352 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200353 self.skipTest('os.statvfs() failed with ENOSYS')
354
Christian Heimes25827622013-10-12 01:27:08 +0200355 p = pickle.dumps(result)
356 self.assertIn(b'\x03cos\nstatvfs_result\n', p)
357 unpickled = pickle.loads(p)
358 self.assertEqual(result, unpickled)
359
Thomas Wouters89f507f2006-12-13 04:49:30 +0000360 def test_utime_dir(self):
361 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000362 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000363 # round to int, because some systems may support sub-second
364 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000365 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
366 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000367 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000368
Larry Hastings76ad59b2012-05-03 00:30:07 -0700369 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600370 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600371 # second argument. Check that the previous methods of passing
372 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700373 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600374 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700375 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
376 # Setting the time to the time you just read, then reading again,
377 # should always return exactly the same times.
378 st1 = os.stat(filename)
379 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
380 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600381 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700382 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600383 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700384 # Set to the current time in the new way
385 os.utime(filename)
386 st3 = os.stat(filename)
387 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
388
389 def test_utime(self):
390 def utime(file, times):
391 return os.utime(file, times)
392 self._test_utime(self.fname, getattr, utime, 10)
393 self._test_utime(support.TESTFN, getattr, utime, 10)
394
395
396 def _test_utime_ns(self, set_times_ns, test_dir=True):
397 def getattr_ns(o, attr):
398 return getattr(o, attr + "_ns")
399 ten_s = 10 * 1000 * 1000 * 1000
400 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
401 if test_dir:
402 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
403
404 def test_utime_ns(self):
405 def utime_ns(file, times):
406 return os.utime(file, ns=times)
407 self._test_utime_ns(utime_ns)
408
Larry Hastings9cf065c2012-06-22 16:30:09 -0700409 requires_utime_dir_fd = unittest.skipUnless(
410 os.utime in os.supports_dir_fd,
411 "dir_fd support for utime required for this test.")
412 requires_utime_fd = unittest.skipUnless(
413 os.utime in os.supports_fd,
414 "fd support for utime required for this test.")
415 requires_utime_nofollow_symlinks = unittest.skipUnless(
416 os.utime in os.supports_follow_symlinks,
417 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700418
Larry Hastings9cf065c2012-06-22 16:30:09 -0700419 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700420 def test_lutimes_ns(self):
421 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700422 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700423 self._test_utime_ns(lutimes_ns)
424
Larry Hastings9cf065c2012-06-22 16:30:09 -0700425 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700426 def test_futimes_ns(self):
427 def futimes_ns(file, times):
428 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700429 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700430 self._test_utime_ns(futimes_ns, test_dir=False)
431
432 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700433 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700434 getattr(os, name)(arg, (5, 5), ns=(5, 5))
435
436 def test_utime_invalid_arguments(self):
437 self._utime_invalid_arguments('utime', self.fname)
438
Brian Curtin52fbea12011-11-06 13:41:17 -0600439
Victor Stinner1aa54a42012-02-08 04:09:37 +0100440 @unittest.skipUnless(stat_supports_subsecond,
441 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100442 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100443 asec, amsec = 1, 901
444 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100445 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100446 mtime = msec + mmsec * 1e-3
447 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100448 os.utime(filename, (0, 0))
449 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200450 with warnings.catch_warnings():
451 warnings.simplefilter("ignore", DeprecationWarning)
452 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100453 st = os.stat(filename)
454 self.assertAlmostEqual(st.st_atime, atime, places=3)
455 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100456
Victor Stinnera2f7c002012-02-08 03:36:25 +0100457 def test_utime_subsecond(self):
458 def set_time(filename, atime, mtime):
459 os.utime(filename, (atime, mtime))
460 self._test_utime_subsecond(set_time)
461
Larry Hastings9cf065c2012-06-22 16:30:09 -0700462 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100463 def test_futimes_subsecond(self):
464 def set_time(filename, atime, mtime):
465 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700466 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100467 self._test_utime_subsecond(set_time)
468
Larry Hastings9cf065c2012-06-22 16:30:09 -0700469 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100470 def test_futimens_subsecond(self):
471 def set_time(filename, atime, mtime):
472 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700473 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100474 self._test_utime_subsecond(set_time)
475
Larry Hastings9cf065c2012-06-22 16:30:09 -0700476 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100477 def test_futimesat_subsecond(self):
478 def set_time(filename, atime, mtime):
479 dirname = os.path.dirname(filename)
480 dirfd = os.open(dirname, os.O_RDONLY)
481 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700482 os.utime(os.path.basename(filename), dir_fd=dirfd,
483 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100484 finally:
485 os.close(dirfd)
486 self._test_utime_subsecond(set_time)
487
Larry Hastings9cf065c2012-06-22 16:30:09 -0700488 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100489 def test_lutimes_subsecond(self):
490 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700491 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100492 self._test_utime_subsecond(set_time)
493
Larry Hastings9cf065c2012-06-22 16:30:09 -0700494 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100495 def test_utimensat_subsecond(self):
496 def set_time(filename, atime, mtime):
497 dirname = os.path.dirname(filename)
498 dirfd = os.open(dirname, os.O_RDONLY)
499 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700500 os.utime(os.path.basename(filename), dir_fd=dirfd,
501 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100502 finally:
503 os.close(dirfd)
504 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100505
Serhiy Storchaka43767632013-11-03 21:31:38 +0200506 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000507 # systems support centiseconds
Serhiy Storchaka43767632013-11-03 21:31:38 +0200508 def get_file_system(path):
509 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000510 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000511 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000512 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000513 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000514 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000515 return buf.value
516
Serhiy Storchaka43767632013-11-03 21:31:38 +0200517 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
518 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
519 "requires NTFS")
520 def test_1565150(self):
521 t1 = 1159195039.25
522 os.utime(self.fname, (t1, t1))
523 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000524
Serhiy Storchaka43767632013-11-03 21:31:38 +0200525 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
526 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
527 "requires NTFS")
528 def test_large_time(self):
529 t1 = 5000000000 # some day in 2128
530 os.utime(self.fname, (t1, t1))
531 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000532
Serhiy Storchaka43767632013-11-03 21:31:38 +0200533 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
534 def test_1686475(self):
535 # Verify that an open file can be stat'ed
536 try:
537 os.stat(r"c:\pagefile.sys")
538 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600539 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200540 except OSError as e:
541 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000542
Serhiy Storchaka43767632013-11-03 21:31:38 +0200543 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
544 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
545 def test_15261(self):
546 # Verify that stat'ing a closed fd does not cause crash
547 r, w = os.pipe()
548 try:
549 os.stat(r) # should not raise error
550 finally:
551 os.close(r)
552 os.close(w)
553 with self.assertRaises(OSError) as ctx:
554 os.stat(r)
555 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100556
Zachary Ware63f277b2014-06-19 09:46:37 -0500557 def check_file_attributes(self, result):
558 self.assertTrue(hasattr(result, 'st_file_attributes'))
559 self.assertTrue(isinstance(result.st_file_attributes, int))
560 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
561
562 @unittest.skipUnless(sys.platform == "win32",
563 "st_file_attributes is Win32 specific")
564 def test_file_attributes(self):
565 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
566 result = os.stat(self.fname)
567 self.check_file_attributes(result)
568 self.assertEqual(
569 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
570 0)
571
572 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
573 result = os.stat(support.TESTFN)
574 self.check_file_attributes(result)
575 self.assertEqual(
576 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
577 stat.FILE_ATTRIBUTE_DIRECTORY)
578
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000579from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000580
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000581class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000582 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000583 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000584
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000585 def setUp(self):
586 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000587 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000588 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000589 for key, value in self._reference().items():
590 os.environ[key] = value
591
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000592 def tearDown(self):
593 os.environ.clear()
594 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000595 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000596 os.environb.clear()
597 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000598
Christian Heimes90333392007-11-01 19:08:42 +0000599 def _reference(self):
600 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
601
602 def _empty_mapping(self):
603 os.environ.clear()
604 return os.environ
605
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000606 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300607 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000608 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000609 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300610 os.environ.update(HELLO="World")
611 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
612 value = popen.read().strip()
613 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000614
Ezio Melottic7e139b2012-09-26 20:01:34 +0300615 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000616 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300617 with os.popen(
618 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
619 it = iter(popen)
620 self.assertEqual(next(it), "line1\n")
621 self.assertEqual(next(it), "line2\n")
622 self.assertEqual(next(it), "line3\n")
623 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000624
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000625 # Verify environ keys and values from the OS are of the
626 # correct str type.
627 def test_keyvalue_types(self):
628 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000629 self.assertEqual(type(key), str)
630 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000631
Christian Heimes90333392007-11-01 19:08:42 +0000632 def test_items(self):
633 for key, value in self._reference().items():
634 self.assertEqual(os.environ.get(key), value)
635
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000636 # Issue 7310
637 def test___repr__(self):
638 """Check that the repr() of os.environ looks like environ({...})."""
639 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000640 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
641 '{!r}: {!r}'.format(key, value)
642 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000643
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000644 def test_get_exec_path(self):
645 defpath_list = os.defpath.split(os.pathsep)
646 test_path = ['/monty', '/python', '', '/flying/circus']
647 test_env = {'PATH': os.pathsep.join(test_path)}
648
649 saved_environ = os.environ
650 try:
651 os.environ = dict(test_env)
652 # Test that defaulting to os.environ works.
653 self.assertSequenceEqual(test_path, os.get_exec_path())
654 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
655 finally:
656 os.environ = saved_environ
657
658 # No PATH environment variable
659 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
660 # Empty PATH environment variable
661 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
662 # Supplied PATH environment variable
663 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
664
Victor Stinnerb745a742010-05-18 17:17:23 +0000665 if os.supports_bytes_environ:
666 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000667 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000668 # ignore BytesWarning warning
669 with warnings.catch_warnings(record=True):
670 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000671 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000672 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000673 pass
674 else:
675 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000676
677 # bytes key and/or value
678 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
679 ['abc'])
680 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
681 ['abc'])
682 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
683 ['abc'])
684
685 @unittest.skipUnless(os.supports_bytes_environ,
686 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000687 def test_environb(self):
688 # os.environ -> os.environb
689 value = 'euro\u20ac'
690 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000691 value_bytes = value.encode(sys.getfilesystemencoding(),
692 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000693 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000694 msg = "U+20AC character is not encodable to %s" % (
695 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000696 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000697 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000698 self.assertEqual(os.environ['unicode'], value)
699 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000700
701 # os.environb -> os.environ
702 value = b'\xff'
703 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000704 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000705 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000706 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000707
Charles-François Natali2966f102011-11-26 11:32:46 +0100708 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
709 # #13415).
710 @support.requires_freebsd_version(7)
711 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100712 def test_unset_error(self):
713 if sys.platform == "win32":
714 # an environment variable is limited to 32,767 characters
715 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100716 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100717 else:
718 # "=" is not allowed in a variable name
719 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100720 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100721
Victor Stinner6d101392013-04-14 16:35:04 +0200722 def test_key_type(self):
723 missing = 'missingkey'
724 self.assertNotIn(missing, os.environ)
725
Victor Stinner839e5ea2013-04-14 16:43:03 +0200726 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200727 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200728 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200729 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200730
Victor Stinner839e5ea2013-04-14 16:43:03 +0200731 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200732 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200733 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200734 self.assertTrue(cm.exception.__suppress_context__)
735
Victor Stinner6d101392013-04-14 16:35:04 +0200736
Tim Petersc4e09402003-04-25 07:11:48 +0000737class WalkTests(unittest.TestCase):
738 """Tests for os.walk()."""
739
Charles-François Natali7372b062012-02-05 15:15:38 +0100740 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000741 import os
742 from os.path import join
743
744 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000745 # TESTFN/
746 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000747 # tmp1
748 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000749 # tmp2
750 # SUB11/ no kids
751 # SUB2/ a file kid and a dirsymlink kid
752 # tmp3
753 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200754 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000755 # TEST2/
756 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000757 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000758 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000759 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000760 sub2_path = join(walk_path, "SUB2")
761 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000762 tmp2_path = join(sub1_path, "tmp2")
763 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000764 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000765 t2_path = join(support.TESTFN, "TEST2")
766 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200767 link_path = join(sub2_path, "link")
768 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000769
770 # Create stuff.
771 os.makedirs(sub11_path)
772 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000773 os.makedirs(t2_path)
774 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000775 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000776 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
777 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000778 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400779 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400780 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200781 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000782 else:
783 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000784
785 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000786 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000787 self.assertEqual(len(all), 4)
788 # We can't know which order SUB1 and SUB2 will appear in.
789 # Not flipped: TESTFN, SUB1, SUB11, SUB2
790 # flipped: TESTFN, SUB2, SUB1, SUB11
791 flipped = all[0][1][0] != "SUB1"
792 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200793 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000794 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000795 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
796 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000797 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000798
799 # Prune the search.
800 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000801 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000802 all.append((root, dirs, files))
803 # Don't descend into SUB1.
804 if 'SUB1' in dirs:
805 # Note that this also mutates the dirs we appended to all!
806 dirs.remove('SUB1')
807 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000808 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200809 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000810 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000811
812 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000813 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000814 self.assertEqual(len(all), 4)
815 # We can't know which order SUB1 and SUB2 will appear in.
816 # Not flipped: SUB11, SUB1, SUB2, TESTFN
817 # flipped: SUB2, SUB11, SUB1, TESTFN
818 flipped = all[3][1][0] != "SUB1"
819 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200820 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000821 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000822 self.assertEqual(all[flipped], (sub11_path, [], []))
823 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000824 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000825
Brian Curtin3b4499c2010-12-28 14:31:47 +0000826 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000827 # Walk, following symlinks.
828 for root, dirs, files in os.walk(walk_path, followlinks=True):
829 if root == link_path:
830 self.assertEqual(dirs, [])
831 self.assertEqual(files, ["tmp4"])
832 break
833 else:
834 self.fail("Didn't follow symlink with followlinks=True")
835
836 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000837 # Tear everything down. This is a decent use for bottom-up on
838 # Windows, which doesn't have a recursive delete command. The
839 # (not so) subtlety is that rmdir will fail unless the dir's
840 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000841 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000842 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000843 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000844 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000845 dirname = os.path.join(root, name)
846 if not os.path.islink(dirname):
847 os.rmdir(dirname)
848 else:
849 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000850 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000851
Charles-François Natali7372b062012-02-05 15:15:38 +0100852
853@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
854class FwalkTests(WalkTests):
855 """Tests for os.fwalk()."""
856
Larry Hastingsc48fe982012-06-25 04:49:05 -0700857 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
858 """
859 compare with walk() results.
860 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700861 walk_kwargs = walk_kwargs.copy()
862 fwalk_kwargs = fwalk_kwargs.copy()
863 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
864 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
865 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700866
Charles-François Natali7372b062012-02-05 15:15:38 +0100867 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700868 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100869 expected[root] = (set(dirs), set(files))
870
Larry Hastingsc48fe982012-06-25 04:49:05 -0700871 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100872 self.assertIn(root, expected)
873 self.assertEqual(expected[root], (set(dirs), set(files)))
874
Larry Hastingsc48fe982012-06-25 04:49:05 -0700875 def test_compare_to_walk(self):
876 kwargs = {'top': support.TESTFN}
877 self._compare_to_walk(kwargs, kwargs)
878
Charles-François Natali7372b062012-02-05 15:15:38 +0100879 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700880 try:
881 fd = os.open(".", os.O_RDONLY)
882 walk_kwargs = {'top': support.TESTFN}
883 fwalk_kwargs = walk_kwargs.copy()
884 fwalk_kwargs['dir_fd'] = fd
885 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
886 finally:
887 os.close(fd)
888
889 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100890 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700891 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
892 args = support.TESTFN, topdown, None
893 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100894 # check that the FD is valid
895 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700896 # redundant check
897 os.stat(rootfd)
898 # check that listdir() returns consistent information
899 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100900
901 def test_fd_leak(self):
902 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
903 # we both check that calling fwalk() a large number of times doesn't
904 # yield EMFILE, and that the minimum allocated FD hasn't changed.
905 minfd = os.dup(1)
906 os.close(minfd)
907 for i in range(256):
908 for x in os.fwalk(support.TESTFN):
909 pass
910 newfd = os.dup(1)
911 self.addCleanup(os.close, newfd)
912 self.assertEqual(newfd, minfd)
913
914 def tearDown(self):
915 # cleanup
916 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
917 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700918 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100919 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700920 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700921 if stat.S_ISDIR(st.st_mode):
922 os.rmdir(name, dir_fd=rootfd)
923 else:
924 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100925 os.rmdir(support.TESTFN)
926
927
Guido van Rossume7ba4952007-06-06 23:52:48 +0000928class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000929 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000930 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000931
932 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000933 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000934 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
935 os.makedirs(path) # Should work
936 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
937 os.makedirs(path)
938
939 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000940 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000941 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
942 os.makedirs(path)
943 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
944 'dir5', 'dir6')
945 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000946
Terry Reedy5a22b652010-12-02 07:05:56 +0000947 def test_exist_ok_existing_directory(self):
948 path = os.path.join(support.TESTFN, 'dir1')
949 mode = 0o777
950 old_mask = os.umask(0o022)
951 os.makedirs(path, mode)
952 self.assertRaises(OSError, os.makedirs, path, mode)
953 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -0400954 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +0000955 os.makedirs(path, mode=mode, exist_ok=True)
956 os.umask(old_mask)
957
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400958 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700959 def test_chown_uid_gid_arguments_must_be_index(self):
960 stat = os.stat(support.TESTFN)
961 uid = stat.st_uid
962 gid = stat.st_gid
963 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
964 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
965 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
966 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
967 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
968
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700969 def test_exist_ok_s_isgid_directory(self):
970 path = os.path.join(support.TESTFN, 'dir1')
971 S_ISGID = stat.S_ISGID
972 mode = 0o777
973 old_mask = os.umask(0o022)
974 try:
975 existing_testfn_mode = stat.S_IMODE(
976 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700977 try:
978 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700979 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700980 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700981 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
982 raise unittest.SkipTest('No support for S_ISGID dir mode.')
983 # The os should apply S_ISGID from the parent dir for us, but
984 # this test need not depend on that behavior. Be explicit.
985 os.makedirs(path, mode | S_ISGID)
986 # http://bugs.python.org/issue14992
987 # Should not fail when the bit is already set.
988 os.makedirs(path, mode, exist_ok=True)
989 # remove the bit.
990 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -0400991 # May work even when the bit is not already set when demanded.
992 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700993 finally:
994 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000995
996 def test_exist_ok_existing_regular_file(self):
997 base = support.TESTFN
998 path = os.path.join(support.TESTFN, 'dir1')
999 f = open(path, 'w')
1000 f.write('abc')
1001 f.close()
1002 self.assertRaises(OSError, os.makedirs, path)
1003 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1004 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1005 os.remove(path)
1006
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001007 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001008 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001009 'dir4', 'dir5', 'dir6')
1010 # If the tests failed, the bottom-most directory ('../dir6')
1011 # may not have been created, so we look for the outermost directory
1012 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001013 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001014 path = os.path.dirname(path)
1015
1016 os.removedirs(path)
1017
Andrew Svetlov405faed2012-12-25 12:18:09 +02001018
1019class RemoveDirsTests(unittest.TestCase):
1020 def setUp(self):
1021 os.makedirs(support.TESTFN)
1022
1023 def tearDown(self):
1024 support.rmtree(support.TESTFN)
1025
1026 def test_remove_all(self):
1027 dira = os.path.join(support.TESTFN, 'dira')
1028 os.mkdir(dira)
1029 dirb = os.path.join(dira, 'dirb')
1030 os.mkdir(dirb)
1031 os.removedirs(dirb)
1032 self.assertFalse(os.path.exists(dirb))
1033 self.assertFalse(os.path.exists(dira))
1034 self.assertFalse(os.path.exists(support.TESTFN))
1035
1036 def test_remove_partial(self):
1037 dira = os.path.join(support.TESTFN, 'dira')
1038 os.mkdir(dira)
1039 dirb = os.path.join(dira, 'dirb')
1040 os.mkdir(dirb)
1041 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1042 f.write('text')
1043 os.removedirs(dirb)
1044 self.assertFalse(os.path.exists(dirb))
1045 self.assertTrue(os.path.exists(dira))
1046 self.assertTrue(os.path.exists(support.TESTFN))
1047
1048 def test_remove_nothing(self):
1049 dira = os.path.join(support.TESTFN, 'dira')
1050 os.mkdir(dira)
1051 dirb = os.path.join(dira, 'dirb')
1052 os.mkdir(dirb)
1053 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1054 f.write('text')
1055 with self.assertRaises(OSError):
1056 os.removedirs(dirb)
1057 self.assertTrue(os.path.exists(dirb))
1058 self.assertTrue(os.path.exists(dira))
1059 self.assertTrue(os.path.exists(support.TESTFN))
1060
1061
Guido van Rossume7ba4952007-06-06 23:52:48 +00001062class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001063 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001064 with open(os.devnull, 'wb') as f:
1065 f.write(b'hello')
1066 f.close()
1067 with open(os.devnull, 'rb') as f:
1068 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001069
Andrew Svetlov405faed2012-12-25 12:18:09 +02001070
Guido van Rossume7ba4952007-06-06 23:52:48 +00001071class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001072 def test_urandom_length(self):
1073 self.assertEqual(len(os.urandom(0)), 0)
1074 self.assertEqual(len(os.urandom(1)), 1)
1075 self.assertEqual(len(os.urandom(10)), 10)
1076 self.assertEqual(len(os.urandom(100)), 100)
1077 self.assertEqual(len(os.urandom(1000)), 1000)
1078
1079 def test_urandom_value(self):
1080 data1 = os.urandom(16)
1081 data2 = os.urandom(16)
1082 self.assertNotEqual(data1, data2)
1083
1084 def get_urandom_subprocess(self, count):
1085 code = '\n'.join((
1086 'import os, sys',
1087 'data = os.urandom(%s)' % count,
1088 'sys.stdout.buffer.write(data)',
1089 'sys.stdout.buffer.flush()'))
1090 out = assert_python_ok('-c', code)
1091 stdout = out[1]
1092 self.assertEqual(len(stdout), 16)
1093 return stdout
1094
1095 def test_urandom_subprocess(self):
1096 data1 = self.get_urandom_subprocess(16)
1097 data2 = self.get_urandom_subprocess(16)
1098 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001099
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001100 @unittest.skipUnless(resource, "test requires the resource module")
1101 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001102 # Check urandom() failing when it is not able to open /dev/random.
1103 # We spawn a new process to make the test more robust (if getrlimit()
1104 # failed to restore the file descriptor limit after this, the whole
1105 # test suite would crash; this actually happened on the OS X Tiger
1106 # buildbot).
1107 code = """if 1:
1108 import errno
1109 import os
1110 import resource
1111
1112 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1113 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1114 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001115 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001116 except OSError as e:
1117 assert e.errno == errno.EMFILE, e.errno
1118 else:
1119 raise AssertionError("OSError not raised")
1120 """
1121 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001122
Antoine Pitroue472aea2014-04-26 14:33:03 +02001123 def test_urandom_fd_closed(self):
1124 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1125 # closed.
1126 code = """if 1:
1127 import os
1128 import sys
1129 os.urandom(4)
1130 os.closerange(3, 256)
1131 sys.stdout.buffer.write(os.urandom(4))
1132 """
1133 rc, out, err = assert_python_ok('-Sc', code)
1134
1135 def test_urandom_fd_reopened(self):
1136 # Issue #21207: urandom() should detect its fd to /dev/urandom
1137 # changed to something else, and reopen it.
1138 with open(support.TESTFN, 'wb') as f:
1139 f.write(b"x" * 256)
1140 self.addCleanup(os.unlink, support.TESTFN)
1141 code = """if 1:
1142 import os
1143 import sys
1144 os.urandom(4)
1145 for fd in range(3, 256):
1146 try:
1147 os.close(fd)
1148 except OSError:
1149 pass
1150 else:
1151 # Found the urandom fd (XXX hopefully)
1152 break
1153 os.closerange(3, 256)
1154 with open({TESTFN!r}, 'rb') as f:
1155 os.dup2(f.fileno(), fd)
1156 sys.stdout.buffer.write(os.urandom(4))
1157 sys.stdout.buffer.write(os.urandom(4))
1158 """.format(TESTFN=support.TESTFN)
1159 rc, out, err = assert_python_ok('-Sc', code)
1160 self.assertEqual(len(out), 8)
1161 self.assertNotEqual(out[0:4], out[4:8])
1162 rc, out2, err2 = assert_python_ok('-Sc', code)
1163 self.assertEqual(len(out2), 8)
1164 self.assertNotEqual(out2, out)
1165
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001166
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001167@contextlib.contextmanager
1168def _execvpe_mockup(defpath=None):
1169 """
1170 Stubs out execv and execve functions when used as context manager.
1171 Records exec calls. The mock execv and execve functions always raise an
1172 exception as they would normally never return.
1173 """
1174 # A list of tuples containing (function name, first arg, args)
1175 # of calls to execv or execve that have been made.
1176 calls = []
1177
1178 def mock_execv(name, *args):
1179 calls.append(('execv', name, args))
1180 raise RuntimeError("execv called")
1181
1182 def mock_execve(name, *args):
1183 calls.append(('execve', name, args))
1184 raise OSError(errno.ENOTDIR, "execve called")
1185
1186 try:
1187 orig_execv = os.execv
1188 orig_execve = os.execve
1189 orig_defpath = os.defpath
1190 os.execv = mock_execv
1191 os.execve = mock_execve
1192 if defpath is not None:
1193 os.defpath = defpath
1194 yield calls
1195 finally:
1196 os.execv = orig_execv
1197 os.execve = orig_execve
1198 os.defpath = orig_defpath
1199
Guido van Rossume7ba4952007-06-06 23:52:48 +00001200class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001201 @unittest.skipIf(USING_LINUXTHREADS,
1202 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001203 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001204 self.assertRaises(OSError, os.execvpe, 'no such app-',
1205 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001206
Thomas Heller6790d602007-08-30 17:15:14 +00001207 def test_execvpe_with_bad_arglist(self):
1208 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1209
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001210 @unittest.skipUnless(hasattr(os, '_execvpe'),
1211 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001212 def _test_internal_execvpe(self, test_type):
1213 program_path = os.sep + 'absolutepath'
1214 if test_type is bytes:
1215 program = b'executable'
1216 fullpath = os.path.join(os.fsencode(program_path), program)
1217 native_fullpath = fullpath
1218 arguments = [b'progname', 'arg1', 'arg2']
1219 else:
1220 program = 'executable'
1221 arguments = ['progname', 'arg1', 'arg2']
1222 fullpath = os.path.join(program_path, program)
1223 if os.name != "nt":
1224 native_fullpath = os.fsencode(fullpath)
1225 else:
1226 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001227 env = {'spam': 'beans'}
1228
Victor Stinnerb745a742010-05-18 17:17:23 +00001229 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001230 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001231 self.assertRaises(RuntimeError,
1232 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001233 self.assertEqual(len(calls), 1)
1234 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1235
Victor Stinnerb745a742010-05-18 17:17:23 +00001236 # test os._execvpe() with a relative path:
1237 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001238 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001239 self.assertRaises(OSError,
1240 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001241 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001242 self.assertSequenceEqual(calls[0],
1243 ('execve', native_fullpath, (arguments, env)))
1244
1245 # test os._execvpe() with a relative path:
1246 # os.get_exec_path() reads the 'PATH' variable
1247 with _execvpe_mockup() as calls:
1248 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001249 if test_type is bytes:
1250 env_path[b'PATH'] = program_path
1251 else:
1252 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001253 self.assertRaises(OSError,
1254 os._execvpe, program, arguments, env=env_path)
1255 self.assertEqual(len(calls), 1)
1256 self.assertSequenceEqual(calls[0],
1257 ('execve', native_fullpath, (arguments, env_path)))
1258
1259 def test_internal_execvpe_str(self):
1260 self._test_internal_execvpe(str)
1261 if os.name != "nt":
1262 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001263
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001264
Serhiy Storchaka43767632013-11-03 21:31:38 +02001265@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001266class Win32ErrorTests(unittest.TestCase):
1267 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001268 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001269
1270 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001271 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001272
1273 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001274 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001275
1276 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001277 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001278 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001279 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001280 finally:
1281 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001282 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001283
1284 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001285 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001286
Thomas Wouters477c8d52006-05-27 19:21:47 +00001287 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001288 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001289
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001290class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001291 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001292 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1293 #singles.append("close")
1294 #We omit close because it doesn'r raise an exception on some platforms
1295 def get_single(f):
1296 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001297 if hasattr(os, f):
1298 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001299 return helper
1300 for f in singles:
1301 locals()["test_"+f] = get_single(f)
1302
Benjamin Peterson7522c742009-01-19 21:00:09 +00001303 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001304 try:
1305 f(support.make_bad_fd(), *args)
1306 except OSError as e:
1307 self.assertEqual(e.errno, errno.EBADF)
1308 else:
1309 self.fail("%r didn't raise a OSError with a bad file descriptor"
1310 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001311
Serhiy Storchaka43767632013-11-03 21:31:38 +02001312 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001313 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001314 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001315
Serhiy Storchaka43767632013-11-03 21:31:38 +02001316 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001317 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001318 fd = support.make_bad_fd()
1319 # Make sure none of the descriptors we are about to close are
1320 # currently valid (issue 6542).
1321 for i in range(10):
1322 try: os.fstat(fd+i)
1323 except OSError:
1324 pass
1325 else:
1326 break
1327 if i < 2:
1328 raise unittest.SkipTest(
1329 "Unable to acquire a range of invalid file descriptors")
1330 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001331
Serhiy Storchaka43767632013-11-03 21:31:38 +02001332 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001333 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001334 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001335
Serhiy Storchaka43767632013-11-03 21:31:38 +02001336 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001337 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001338 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001339
Serhiy Storchaka43767632013-11-03 21:31:38 +02001340 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001341 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001342 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001343
Serhiy Storchaka43767632013-11-03 21:31:38 +02001344 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001345 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001346 self.check(os.pathconf, "PC_NAME_MAX")
1347 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001348
Serhiy Storchaka43767632013-11-03 21:31:38 +02001349 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001350 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001351 self.check(os.truncate, 0)
1352 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001353
Serhiy Storchaka43767632013-11-03 21:31:38 +02001354 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001355 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001356 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001357
Serhiy Storchaka43767632013-11-03 21:31:38 +02001358 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001359 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001360 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001361
Victor Stinner57ddf782014-01-08 15:21:28 +01001362 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1363 def test_readv(self):
1364 buf = bytearray(10)
1365 self.check(os.readv, [buf])
1366
Serhiy Storchaka43767632013-11-03 21:31:38 +02001367 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001368 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001369 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001370
Serhiy Storchaka43767632013-11-03 21:31:38 +02001371 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001372 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001373 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001374
Victor Stinner57ddf782014-01-08 15:21:28 +01001375 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1376 def test_writev(self):
1377 self.check(os.writev, [b'abc'])
1378
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001379 def test_inheritable(self):
1380 self.check(os.get_inheritable)
1381 self.check(os.set_inheritable, True)
1382
1383 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1384 'needs os.get_blocking() and os.set_blocking()')
1385 def test_blocking(self):
1386 self.check(os.get_blocking)
1387 self.check(os.set_blocking, True)
1388
Brian Curtin1b9df392010-11-24 20:24:31 +00001389
1390class LinkTests(unittest.TestCase):
1391 def setUp(self):
1392 self.file1 = support.TESTFN
1393 self.file2 = os.path.join(support.TESTFN + "2")
1394
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001395 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001396 for file in (self.file1, self.file2):
1397 if os.path.exists(file):
1398 os.unlink(file)
1399
Brian Curtin1b9df392010-11-24 20:24:31 +00001400 def _test_link(self, file1, file2):
1401 with open(file1, "w") as f1:
1402 f1.write("test")
1403
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001404 with warnings.catch_warnings():
1405 warnings.simplefilter("ignore", DeprecationWarning)
1406 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001407 with open(file1, "r") as f1, open(file2, "r") as f2:
1408 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1409
1410 def test_link(self):
1411 self._test_link(self.file1, self.file2)
1412
1413 def test_link_bytes(self):
1414 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1415 bytes(self.file2, sys.getfilesystemencoding()))
1416
Brian Curtinf498b752010-11-30 15:54:04 +00001417 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001418 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001419 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001420 except UnicodeError:
1421 raise unittest.SkipTest("Unable to encode for this platform.")
1422
Brian Curtinf498b752010-11-30 15:54:04 +00001423 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001424 self.file2 = self.file1 + "2"
1425 self._test_link(self.file1, self.file2)
1426
Serhiy Storchaka43767632013-11-03 21:31:38 +02001427@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1428class PosixUidGidTests(unittest.TestCase):
1429 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1430 def test_setuid(self):
1431 if os.getuid() != 0:
1432 self.assertRaises(OSError, os.setuid, 0)
1433 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001434
Serhiy Storchaka43767632013-11-03 21:31:38 +02001435 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1436 def test_setgid(self):
1437 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1438 self.assertRaises(OSError, os.setgid, 0)
1439 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001440
Serhiy Storchaka43767632013-11-03 21:31:38 +02001441 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1442 def test_seteuid(self):
1443 if os.getuid() != 0:
1444 self.assertRaises(OSError, os.seteuid, 0)
1445 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001446
Serhiy Storchaka43767632013-11-03 21:31:38 +02001447 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1448 def test_setegid(self):
1449 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1450 self.assertRaises(OSError, os.setegid, 0)
1451 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001452
Serhiy Storchaka43767632013-11-03 21:31:38 +02001453 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1454 def test_setreuid(self):
1455 if os.getuid() != 0:
1456 self.assertRaises(OSError, os.setreuid, 0, 0)
1457 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1458 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001459
Serhiy Storchaka43767632013-11-03 21:31:38 +02001460 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1461 def test_setreuid_neg1(self):
1462 # Needs to accept -1. We run this in a subprocess to avoid
1463 # altering the test runner's process state (issue8045).
1464 subprocess.check_call([
1465 sys.executable, '-c',
1466 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001467
Serhiy Storchaka43767632013-11-03 21:31:38 +02001468 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1469 def test_setregid(self):
1470 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1471 self.assertRaises(OSError, os.setregid, 0, 0)
1472 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1473 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001474
Serhiy Storchaka43767632013-11-03 21:31:38 +02001475 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1476 def test_setregid_neg1(self):
1477 # Needs to accept -1. We run this in a subprocess to avoid
1478 # altering the test runner's process state (issue8045).
1479 subprocess.check_call([
1480 sys.executable, '-c',
1481 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001482
Serhiy Storchaka43767632013-11-03 21:31:38 +02001483@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1484class Pep383Tests(unittest.TestCase):
1485 def setUp(self):
1486 if support.TESTFN_UNENCODABLE:
1487 self.dir = support.TESTFN_UNENCODABLE
1488 elif support.TESTFN_NONASCII:
1489 self.dir = support.TESTFN_NONASCII
1490 else:
1491 self.dir = support.TESTFN
1492 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001493
Serhiy Storchaka43767632013-11-03 21:31:38 +02001494 bytesfn = []
1495 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001496 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001497 fn = os.fsencode(fn)
1498 except UnicodeEncodeError:
1499 return
1500 bytesfn.append(fn)
1501 add_filename(support.TESTFN_UNICODE)
1502 if support.TESTFN_UNENCODABLE:
1503 add_filename(support.TESTFN_UNENCODABLE)
1504 if support.TESTFN_NONASCII:
1505 add_filename(support.TESTFN_NONASCII)
1506 if not bytesfn:
1507 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001508
Serhiy Storchaka43767632013-11-03 21:31:38 +02001509 self.unicodefn = set()
1510 os.mkdir(self.dir)
1511 try:
1512 for fn in bytesfn:
1513 support.create_empty_file(os.path.join(self.bdir, fn))
1514 fn = os.fsdecode(fn)
1515 if fn in self.unicodefn:
1516 raise ValueError("duplicate filename")
1517 self.unicodefn.add(fn)
1518 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001519 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001520 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001521
Serhiy Storchaka43767632013-11-03 21:31:38 +02001522 def tearDown(self):
1523 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001524
Serhiy Storchaka43767632013-11-03 21:31:38 +02001525 def test_listdir(self):
1526 expected = self.unicodefn
1527 found = set(os.listdir(self.dir))
1528 self.assertEqual(found, expected)
1529 # test listdir without arguments
1530 current_directory = os.getcwd()
1531 try:
1532 os.chdir(os.sep)
1533 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1534 finally:
1535 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001536
Serhiy Storchaka43767632013-11-03 21:31:38 +02001537 def test_open(self):
1538 for fn in self.unicodefn:
1539 f = open(os.path.join(self.dir, fn), 'rb')
1540 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001541
Serhiy Storchaka43767632013-11-03 21:31:38 +02001542 @unittest.skipUnless(hasattr(os, 'statvfs'),
1543 "need os.statvfs()")
1544 def test_statvfs(self):
1545 # issue #9645
1546 for fn in self.unicodefn:
1547 # should not fail with file not found error
1548 fullname = os.path.join(self.dir, fn)
1549 os.statvfs(fullname)
1550
1551 def test_stat(self):
1552 for fn in self.unicodefn:
1553 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001554
Brian Curtineb24d742010-04-12 17:16:38 +00001555@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1556class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001557 def _kill(self, sig):
1558 # Start sys.executable as a subprocess and communicate from the
1559 # subprocess to the parent that the interpreter is ready. When it
1560 # becomes ready, send *sig* via os.kill to the subprocess and check
1561 # that the return code is equal to *sig*.
1562 import ctypes
1563 from ctypes import wintypes
1564 import msvcrt
1565
1566 # Since we can't access the contents of the process' stdout until the
1567 # process has exited, use PeekNamedPipe to see what's inside stdout
1568 # without waiting. This is done so we can tell that the interpreter
1569 # is started and running at a point where it could handle a signal.
1570 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1571 PeekNamedPipe.restype = wintypes.BOOL
1572 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1573 ctypes.POINTER(ctypes.c_char), # stdout buf
1574 wintypes.DWORD, # Buffer size
1575 ctypes.POINTER(wintypes.DWORD), # bytes read
1576 ctypes.POINTER(wintypes.DWORD), # bytes avail
1577 ctypes.POINTER(wintypes.DWORD)) # bytes left
1578 msg = "running"
1579 proc = subprocess.Popen([sys.executable, "-c",
1580 "import sys;"
1581 "sys.stdout.write('{}');"
1582 "sys.stdout.flush();"
1583 "input()".format(msg)],
1584 stdout=subprocess.PIPE,
1585 stderr=subprocess.PIPE,
1586 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001587 self.addCleanup(proc.stdout.close)
1588 self.addCleanup(proc.stderr.close)
1589 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001590
1591 count, max = 0, 100
1592 while count < max and proc.poll() is None:
1593 # Create a string buffer to store the result of stdout from the pipe
1594 buf = ctypes.create_string_buffer(len(msg))
1595 # Obtain the text currently in proc.stdout
1596 # Bytes read/avail/left are left as NULL and unused
1597 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1598 buf, ctypes.sizeof(buf), None, None, None)
1599 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1600 if buf.value:
1601 self.assertEqual(msg, buf.value.decode())
1602 break
1603 time.sleep(0.1)
1604 count += 1
1605 else:
1606 self.fail("Did not receive communication from the subprocess")
1607
Brian Curtineb24d742010-04-12 17:16:38 +00001608 os.kill(proc.pid, sig)
1609 self.assertEqual(proc.wait(), sig)
1610
1611 def test_kill_sigterm(self):
1612 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001613 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001614
1615 def test_kill_int(self):
1616 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001617 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001618
1619 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001620 tagname = "test_os_%s" % uuid.uuid1()
1621 m = mmap.mmap(-1, 1, tagname)
1622 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001623 # Run a script which has console control handling enabled.
1624 proc = subprocess.Popen([sys.executable,
1625 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001626 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001627 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1628 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001629 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001630 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001631 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001632 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001633 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001634 count += 1
1635 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001636 # Forcefully kill the process if we weren't able to signal it.
1637 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001638 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001639 os.kill(proc.pid, event)
1640 # proc.send_signal(event) could also be done here.
1641 # Allow time for the signal to be passed and the process to exit.
1642 time.sleep(0.5)
1643 if not proc.poll():
1644 # Forcefully kill the process if we weren't able to signal it.
1645 os.kill(proc.pid, signal.SIGINT)
1646 self.fail("subprocess did not stop on {}".format(name))
1647
1648 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1649 def test_CTRL_C_EVENT(self):
1650 from ctypes import wintypes
1651 import ctypes
1652
1653 # Make a NULL value by creating a pointer with no argument.
1654 NULL = ctypes.POINTER(ctypes.c_int)()
1655 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1656 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1657 wintypes.BOOL)
1658 SetConsoleCtrlHandler.restype = wintypes.BOOL
1659
1660 # Calling this with NULL and FALSE causes the calling process to
1661 # handle CTRL+C, rather than ignore it. This property is inherited
1662 # by subprocesses.
1663 SetConsoleCtrlHandler(NULL, 0)
1664
1665 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1666
1667 def test_CTRL_BREAK_EVENT(self):
1668 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1669
1670
Brian Curtind40e6f72010-07-08 21:39:08 +00001671@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001672class Win32ListdirTests(unittest.TestCase):
1673 """Test listdir on Windows."""
1674
1675 def setUp(self):
1676 self.created_paths = []
1677 for i in range(2):
1678 dir_name = 'SUB%d' % i
1679 dir_path = os.path.join(support.TESTFN, dir_name)
1680 file_name = 'FILE%d' % i
1681 file_path = os.path.join(support.TESTFN, file_name)
1682 os.makedirs(dir_path)
1683 with open(file_path, 'w') as f:
1684 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1685 self.created_paths.extend([dir_name, file_name])
1686 self.created_paths.sort()
1687
1688 def tearDown(self):
1689 shutil.rmtree(support.TESTFN)
1690
1691 def test_listdir_no_extended_path(self):
1692 """Test when the path is not an "extended" path."""
1693 # unicode
1694 self.assertEqual(
1695 sorted(os.listdir(support.TESTFN)),
1696 self.created_paths)
1697 # bytes
1698 self.assertEqual(
1699 sorted(os.listdir(os.fsencode(support.TESTFN))),
1700 [os.fsencode(path) for path in self.created_paths])
1701
1702 def test_listdir_extended_path(self):
1703 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001704 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001705 # unicode
1706 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1707 self.assertEqual(
1708 sorted(os.listdir(path)),
1709 self.created_paths)
1710 # bytes
1711 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1712 self.assertEqual(
1713 sorted(os.listdir(path)),
1714 [os.fsencode(path) for path in self.created_paths])
1715
1716
1717@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001718@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001719class Win32SymlinkTests(unittest.TestCase):
1720 filelink = 'filelinktest'
1721 filelink_target = os.path.abspath(__file__)
1722 dirlink = 'dirlinktest'
1723 dirlink_target = os.path.dirname(filelink_target)
1724 missing_link = 'missing link'
1725
1726 def setUp(self):
1727 assert os.path.exists(self.dirlink_target)
1728 assert os.path.exists(self.filelink_target)
1729 assert not os.path.exists(self.dirlink)
1730 assert not os.path.exists(self.filelink)
1731 assert not os.path.exists(self.missing_link)
1732
1733 def tearDown(self):
1734 if os.path.exists(self.filelink):
1735 os.remove(self.filelink)
1736 if os.path.exists(self.dirlink):
1737 os.rmdir(self.dirlink)
1738 if os.path.lexists(self.missing_link):
1739 os.remove(self.missing_link)
1740
1741 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001742 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001743 self.assertTrue(os.path.exists(self.dirlink))
1744 self.assertTrue(os.path.isdir(self.dirlink))
1745 self.assertTrue(os.path.islink(self.dirlink))
1746 self.check_stat(self.dirlink, self.dirlink_target)
1747
1748 def test_file_link(self):
1749 os.symlink(self.filelink_target, self.filelink)
1750 self.assertTrue(os.path.exists(self.filelink))
1751 self.assertTrue(os.path.isfile(self.filelink))
1752 self.assertTrue(os.path.islink(self.filelink))
1753 self.check_stat(self.filelink, self.filelink_target)
1754
1755 def _create_missing_dir_link(self):
1756 'Create a "directory" link to a non-existent target'
1757 linkname = self.missing_link
1758 if os.path.lexists(linkname):
1759 os.remove(linkname)
1760 target = r'c:\\target does not exist.29r3c740'
1761 assert not os.path.exists(target)
1762 target_is_dir = True
1763 os.symlink(target, linkname, target_is_dir)
1764
1765 def test_remove_directory_link_to_missing_target(self):
1766 self._create_missing_dir_link()
1767 # For compatibility with Unix, os.remove will check the
1768 # directory status and call RemoveDirectory if the symlink
1769 # was created with target_is_dir==True.
1770 os.remove(self.missing_link)
1771
1772 @unittest.skip("currently fails; consider for improvement")
1773 def test_isdir_on_directory_link_to_missing_target(self):
1774 self._create_missing_dir_link()
1775 # consider having isdir return true for directory links
1776 self.assertTrue(os.path.isdir(self.missing_link))
1777
1778 @unittest.skip("currently fails; consider for improvement")
1779 def test_rmdir_on_directory_link_to_missing_target(self):
1780 self._create_missing_dir_link()
1781 # consider allowing rmdir to remove directory links
1782 os.rmdir(self.missing_link)
1783
1784 def check_stat(self, link, target):
1785 self.assertEqual(os.stat(link), os.stat(target))
1786 self.assertNotEqual(os.lstat(link), os.stat(link))
1787
Brian Curtind25aef52011-06-13 15:16:04 -05001788 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001789 with warnings.catch_warnings():
1790 warnings.simplefilter("ignore", DeprecationWarning)
1791 self.assertEqual(os.stat(bytes_link), os.stat(target))
1792 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001793
1794 def test_12084(self):
1795 level1 = os.path.abspath(support.TESTFN)
1796 level2 = os.path.join(level1, "level2")
1797 level3 = os.path.join(level2, "level3")
1798 try:
1799 os.mkdir(level1)
1800 os.mkdir(level2)
1801 os.mkdir(level3)
1802
1803 file1 = os.path.abspath(os.path.join(level1, "file1"))
1804
1805 with open(file1, "w") as f:
1806 f.write("file1")
1807
1808 orig_dir = os.getcwd()
1809 try:
1810 os.chdir(level2)
1811 link = os.path.join(level2, "link")
1812 os.symlink(os.path.relpath(file1), "link")
1813 self.assertIn("link", os.listdir(os.getcwd()))
1814
1815 # Check os.stat calls from the same dir as the link
1816 self.assertEqual(os.stat(file1), os.stat("link"))
1817
1818 # Check os.stat calls from a dir below the link
1819 os.chdir(level1)
1820 self.assertEqual(os.stat(file1),
1821 os.stat(os.path.relpath(link)))
1822
1823 # Check os.stat calls from a dir above the link
1824 os.chdir(level3)
1825 self.assertEqual(os.stat(file1),
1826 os.stat(os.path.relpath(link)))
1827 finally:
1828 os.chdir(orig_dir)
1829 except OSError as err:
1830 self.fail(err)
1831 finally:
1832 os.remove(file1)
1833 shutil.rmtree(level1)
1834
Brian Curtind40e6f72010-07-08 21:39:08 +00001835
Tim Golden0321cf22014-05-05 19:46:17 +01001836@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1837class Win32JunctionTests(unittest.TestCase):
1838 junction = 'junctiontest'
1839 junction_target = os.path.dirname(os.path.abspath(__file__))
1840
1841 def setUp(self):
1842 assert os.path.exists(self.junction_target)
1843 assert not os.path.exists(self.junction)
1844
1845 def tearDown(self):
1846 if os.path.exists(self.junction):
1847 # os.rmdir delegates to Windows' RemoveDirectoryW,
1848 # which removes junction points safely.
1849 os.rmdir(self.junction)
1850
1851 def test_create_junction(self):
1852 _winapi.CreateJunction(self.junction_target, self.junction)
1853 self.assertTrue(os.path.exists(self.junction))
1854 self.assertTrue(os.path.isdir(self.junction))
1855
1856 # Junctions are not recognized as links.
1857 self.assertFalse(os.path.islink(self.junction))
1858
1859 def test_unlink_removes_junction(self):
1860 _winapi.CreateJunction(self.junction_target, self.junction)
1861 self.assertTrue(os.path.exists(self.junction))
1862
1863 os.unlink(self.junction)
1864 self.assertFalse(os.path.exists(self.junction))
1865
1866
Jason R. Coombs3a092862013-05-27 23:21:28 -04001867@support.skip_unless_symlink
1868class NonLocalSymlinkTests(unittest.TestCase):
1869
1870 def setUp(self):
1871 """
1872 Create this structure:
1873
1874 base
1875 \___ some_dir
1876 """
1877 os.makedirs('base/some_dir')
1878
1879 def tearDown(self):
1880 shutil.rmtree('base')
1881
1882 def test_directory_link_nonlocal(self):
1883 """
1884 The symlink target should resolve relative to the link, not relative
1885 to the current directory.
1886
1887 Then, link base/some_link -> base/some_dir and ensure that some_link
1888 is resolved as a directory.
1889
1890 In issue13772, it was discovered that directory detection failed if
1891 the symlink target was not specified relative to the current
1892 directory, which was a defect in the implementation.
1893 """
1894 src = os.path.join('base', 'some_link')
1895 os.symlink('some_dir', src)
1896 assert os.path.isdir(src)
1897
1898
Victor Stinnere8d51452010-08-19 01:05:19 +00001899class FSEncodingTests(unittest.TestCase):
1900 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001901 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1902 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001903
Victor Stinnere8d51452010-08-19 01:05:19 +00001904 def test_identity(self):
1905 # assert fsdecode(fsencode(x)) == x
1906 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1907 try:
1908 bytesfn = os.fsencode(fn)
1909 except UnicodeEncodeError:
1910 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001911 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001912
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001913
Brett Cannonefb00c02012-02-29 18:31:31 -05001914
1915class DeviceEncodingTests(unittest.TestCase):
1916
1917 def test_bad_fd(self):
1918 # Return None when an fd doesn't actually exist.
1919 self.assertIsNone(os.device_encoding(123456))
1920
Philip Jenveye308b7c2012-02-29 16:16:15 -08001921 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1922 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001923 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001924 def test_device_encoding(self):
1925 encoding = os.device_encoding(0)
1926 self.assertIsNotNone(encoding)
1927 self.assertTrue(codecs.lookup(encoding))
1928
1929
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001930class PidTests(unittest.TestCase):
1931 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1932 def test_getppid(self):
1933 p = subprocess.Popen([sys.executable, '-c',
1934 'import os; print(os.getppid())'],
1935 stdout=subprocess.PIPE)
1936 stdout, _ = p.communicate()
1937 # We are the parent of our subprocess
1938 self.assertEqual(int(stdout), os.getpid())
1939
1940
Brian Curtin0151b8e2010-09-24 13:43:43 +00001941# The introduction of this TestCase caused at least two different errors on
1942# *nix buildbots. Temporarily skip this to let the buildbots move along.
1943@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001944@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1945class LoginTests(unittest.TestCase):
1946 def test_getlogin(self):
1947 user_name = os.getlogin()
1948 self.assertNotEqual(len(user_name), 0)
1949
1950
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001951@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1952 "needs os.getpriority and os.setpriority")
1953class ProgramPriorityTests(unittest.TestCase):
1954 """Tests for os.getpriority() and os.setpriority()."""
1955
1956 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001957
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001958 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1959 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1960 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001961 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1962 if base >= 19 and new_prio <= 19:
1963 raise unittest.SkipTest(
1964 "unable to reliably test setpriority at current nice level of %s" % base)
1965 else:
1966 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001967 finally:
1968 try:
1969 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1970 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001971 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001972 raise
1973
1974
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001975if threading is not None:
1976 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001977
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001978 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001979
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001980 def __init__(self, conn):
1981 asynchat.async_chat.__init__(self, conn)
1982 self.in_buffer = []
1983 self.closed = False
1984 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001985
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001986 def handle_read(self):
1987 data = self.recv(4096)
1988 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001989
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001990 def get_data(self):
1991 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001992
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001993 def handle_close(self):
1994 self.close()
1995 self.closed = True
1996
1997 def handle_error(self):
1998 raise
1999
2000 def __init__(self, address):
2001 threading.Thread.__init__(self)
2002 asyncore.dispatcher.__init__(self)
2003 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2004 self.bind(address)
2005 self.listen(5)
2006 self.host, self.port = self.socket.getsockname()[:2]
2007 self.handler_instance = None
2008 self._active = False
2009 self._active_lock = threading.Lock()
2010
2011 # --- public API
2012
2013 @property
2014 def running(self):
2015 return self._active
2016
2017 def start(self):
2018 assert not self.running
2019 self.__flag = threading.Event()
2020 threading.Thread.start(self)
2021 self.__flag.wait()
2022
2023 def stop(self):
2024 assert self.running
2025 self._active = False
2026 self.join()
2027
2028 def wait(self):
2029 # wait for handler connection to be closed, then stop the server
2030 while not getattr(self.handler_instance, "closed", False):
2031 time.sleep(0.001)
2032 self.stop()
2033
2034 # --- internals
2035
2036 def run(self):
2037 self._active = True
2038 self.__flag.set()
2039 while self._active and asyncore.socket_map:
2040 self._active_lock.acquire()
2041 asyncore.loop(timeout=0.001, count=1)
2042 self._active_lock.release()
2043 asyncore.close_all()
2044
2045 def handle_accept(self):
2046 conn, addr = self.accept()
2047 self.handler_instance = self.Handler(conn)
2048
2049 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002050 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002051 handle_read = handle_connect
2052
2053 def writable(self):
2054 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002055
2056 def handle_error(self):
2057 raise
2058
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002059
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002060@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002061@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2062class TestSendfile(unittest.TestCase):
2063
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002064 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002065 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002066 not sys.platform.startswith("solaris") and \
2067 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002068 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2069 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002070
2071 @classmethod
2072 def setUpClass(cls):
2073 with open(support.TESTFN, "wb") as f:
2074 f.write(cls.DATA)
2075
2076 @classmethod
2077 def tearDownClass(cls):
2078 support.unlink(support.TESTFN)
2079
2080 def setUp(self):
2081 self.server = SendfileTestServer((support.HOST, 0))
2082 self.server.start()
2083 self.client = socket.socket()
2084 self.client.connect((self.server.host, self.server.port))
2085 self.client.settimeout(1)
2086 # synchronize by waiting for "220 ready" response
2087 self.client.recv(1024)
2088 self.sockno = self.client.fileno()
2089 self.file = open(support.TESTFN, 'rb')
2090 self.fileno = self.file.fileno()
2091
2092 def tearDown(self):
2093 self.file.close()
2094 self.client.close()
2095 if self.server.running:
2096 self.server.stop()
2097
2098 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2099 """A higher level wrapper representing how an application is
2100 supposed to use sendfile().
2101 """
2102 while 1:
2103 try:
2104 if self.SUPPORT_HEADERS_TRAILERS:
2105 return os.sendfile(sock, file, offset, nbytes, headers,
2106 trailers)
2107 else:
2108 return os.sendfile(sock, file, offset, nbytes)
2109 except OSError as err:
2110 if err.errno == errno.ECONNRESET:
2111 # disconnected
2112 raise
2113 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2114 # we have to retry send data
2115 continue
2116 else:
2117 raise
2118
2119 def test_send_whole_file(self):
2120 # normal send
2121 total_sent = 0
2122 offset = 0
2123 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002124 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002125 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2126 if sent == 0:
2127 break
2128 offset += sent
2129 total_sent += sent
2130 self.assertTrue(sent <= nbytes)
2131 self.assertEqual(offset, total_sent)
2132
2133 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002134 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002135 self.client.close()
2136 self.server.wait()
2137 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002138 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002139 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002140
2141 def test_send_at_certain_offset(self):
2142 # start sending a file at a certain offset
2143 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002144 offset = len(self.DATA) // 2
2145 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002146 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002147 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002148 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2149 if sent == 0:
2150 break
2151 offset += sent
2152 total_sent += sent
2153 self.assertTrue(sent <= nbytes)
2154
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002155 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002156 self.client.close()
2157 self.server.wait()
2158 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002159 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002160 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002161 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002162 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002163
2164 def test_offset_overflow(self):
2165 # specify an offset > file size
2166 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002167 try:
2168 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2169 except OSError as e:
2170 # Solaris can raise EINVAL if offset >= file length, ignore.
2171 if e.errno != errno.EINVAL:
2172 raise
2173 else:
2174 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002175 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002176 self.client.close()
2177 self.server.wait()
2178 data = self.server.handler_instance.get_data()
2179 self.assertEqual(data, b'')
2180
2181 def test_invalid_offset(self):
2182 with self.assertRaises(OSError) as cm:
2183 os.sendfile(self.sockno, self.fileno, -1, 4096)
2184 self.assertEqual(cm.exception.errno, errno.EINVAL)
2185
2186 # --- headers / trailers tests
2187
Serhiy Storchaka43767632013-11-03 21:31:38 +02002188 @requires_headers_trailers
2189 def test_headers(self):
2190 total_sent = 0
2191 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2192 headers=[b"x" * 512])
2193 total_sent += sent
2194 offset = 4096
2195 nbytes = 4096
2196 while 1:
2197 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2198 offset, nbytes)
2199 if sent == 0:
2200 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002201 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002202 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002203
Serhiy Storchaka43767632013-11-03 21:31:38 +02002204 expected_data = b"x" * 512 + self.DATA
2205 self.assertEqual(total_sent, len(expected_data))
2206 self.client.close()
2207 self.server.wait()
2208 data = self.server.handler_instance.get_data()
2209 self.assertEqual(hash(data), hash(expected_data))
2210
2211 @requires_headers_trailers
2212 def test_trailers(self):
2213 TESTFN2 = support.TESTFN + "2"
2214 file_data = b"abcdef"
2215 with open(TESTFN2, 'wb') as f:
2216 f.write(file_data)
2217 with open(TESTFN2, 'rb')as f:
2218 self.addCleanup(os.remove, TESTFN2)
2219 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2220 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002221 self.client.close()
2222 self.server.wait()
2223 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002224 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002225
Serhiy Storchaka43767632013-11-03 21:31:38 +02002226 @requires_headers_trailers
2227 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2228 'test needs os.SF_NODISKIO')
2229 def test_flags(self):
2230 try:
2231 os.sendfile(self.sockno, self.fileno, 0, 4096,
2232 flags=os.SF_NODISKIO)
2233 except OSError as err:
2234 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2235 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002236
2237
Larry Hastings9cf065c2012-06-22 16:30:09 -07002238def supports_extended_attributes():
2239 if not hasattr(os, "setxattr"):
2240 return False
2241 try:
2242 with open(support.TESTFN, "wb") as fp:
2243 try:
2244 os.setxattr(fp.fileno(), b"user.test", b"")
2245 except OSError:
2246 return False
2247 finally:
2248 support.unlink(support.TESTFN)
2249 # Kernels < 2.6.39 don't respect setxattr flags.
2250 kernel_version = platform.release()
2251 m = re.match("2.6.(\d{1,2})", kernel_version)
2252 return m is None or int(m.group(1)) >= 39
2253
2254
2255@unittest.skipUnless(supports_extended_attributes(),
2256 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002257class ExtendedAttributeTests(unittest.TestCase):
2258
2259 def tearDown(self):
2260 support.unlink(support.TESTFN)
2261
Larry Hastings9cf065c2012-06-22 16:30:09 -07002262 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002263 fn = support.TESTFN
2264 open(fn, "wb").close()
2265 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002266 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002267 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002268 init_xattr = listxattr(fn)
2269 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002270 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002271 xattr = set(init_xattr)
2272 xattr.add("user.test")
2273 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002274 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2275 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2276 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002277 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002278 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002279 self.assertEqual(cm.exception.errno, errno.EEXIST)
2280 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002281 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002282 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002283 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002284 xattr.add("user.test2")
2285 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002286 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002287 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002288 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002289 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002290 xattr.remove("user.test")
2291 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002292 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2293 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2294 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2295 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002296 many = sorted("user.test{}".format(i) for i in range(100))
2297 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002298 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002299 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002300
Larry Hastings9cf065c2012-06-22 16:30:09 -07002301 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002302 def make_bytes(s):
2303 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002304 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002305 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002306 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002307
2308 def test_simple(self):
2309 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2310 os.listxattr)
2311
2312 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002313 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2314 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002315
2316 def test_fds(self):
2317 def getxattr(path, *args):
2318 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002319 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002320 def setxattr(path, *args):
2321 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002322 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002323 def removexattr(path, *args):
2324 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002325 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002326 def listxattr(path, *args):
2327 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002328 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002329 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2330
2331
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002332@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2333class Win32DeprecatedBytesAPI(unittest.TestCase):
2334 def test_deprecated(self):
2335 import nt
2336 filename = os.fsencode(support.TESTFN)
2337 with warnings.catch_warnings():
2338 warnings.simplefilter("error", DeprecationWarning)
2339 for func, *args in (
2340 (nt._getfullpathname, filename),
2341 (nt._isdir, filename),
2342 (os.access, filename, os.R_OK),
2343 (os.chdir, filename),
2344 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002345 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002346 (os.link, filename, filename),
2347 (os.listdir, filename),
2348 (os.lstat, filename),
2349 (os.mkdir, filename),
2350 (os.open, filename, os.O_RDONLY),
2351 (os.rename, filename, filename),
2352 (os.rmdir, filename),
2353 (os.startfile, filename),
2354 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002355 (os.unlink, filename),
2356 (os.utime, filename),
2357 ):
2358 self.assertRaises(DeprecationWarning, func, *args)
2359
Victor Stinner28216442011-11-16 00:34:44 +01002360 @support.skip_unless_symlink
2361 def test_symlink(self):
2362 filename = os.fsencode(support.TESTFN)
2363 with warnings.catch_warnings():
2364 warnings.simplefilter("error", DeprecationWarning)
2365 self.assertRaises(DeprecationWarning,
2366 os.symlink, filename, filename)
2367
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002368
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002369@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2370class TermsizeTests(unittest.TestCase):
2371 def test_does_not_crash(self):
2372 """Check if get_terminal_size() returns a meaningful value.
2373
2374 There's no easy portable way to actually check the size of the
2375 terminal, so let's check if it returns something sensible instead.
2376 """
2377 try:
2378 size = os.get_terminal_size()
2379 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002380 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002381 # Under win32 a generic OSError can be thrown if the
2382 # handle cannot be retrieved
2383 self.skipTest("failed to query terminal size")
2384 raise
2385
Antoine Pitroucfade362012-02-08 23:48:59 +01002386 self.assertGreaterEqual(size.columns, 0)
2387 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002388
2389 def test_stty_match(self):
2390 """Check if stty returns the same results
2391
2392 stty actually tests stdin, so get_terminal_size is invoked on
2393 stdin explicitly. If stty succeeded, then get_terminal_size()
2394 should work too.
2395 """
2396 try:
2397 size = subprocess.check_output(['stty', 'size']).decode().split()
2398 except (FileNotFoundError, subprocess.CalledProcessError):
2399 self.skipTest("stty invocation failed")
2400 expected = (int(size[1]), int(size[0])) # reversed order
2401
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002402 try:
2403 actual = os.get_terminal_size(sys.__stdin__.fileno())
2404 except OSError as e:
2405 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2406 # Under win32 a generic OSError can be thrown if the
2407 # handle cannot be retrieved
2408 self.skipTest("failed to query terminal size")
2409 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002410 self.assertEqual(expected, actual)
2411
2412
Victor Stinner292c8352012-10-30 02:17:38 +01002413class OSErrorTests(unittest.TestCase):
2414 def setUp(self):
2415 class Str(str):
2416 pass
2417
Victor Stinnerafe17062012-10-31 22:47:43 +01002418 self.bytes_filenames = []
2419 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002420 if support.TESTFN_UNENCODABLE is not None:
2421 decoded = support.TESTFN_UNENCODABLE
2422 else:
2423 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002424 self.unicode_filenames.append(decoded)
2425 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002426 if support.TESTFN_UNDECODABLE is not None:
2427 encoded = support.TESTFN_UNDECODABLE
2428 else:
2429 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002430 self.bytes_filenames.append(encoded)
2431 self.bytes_filenames.append(memoryview(encoded))
2432
2433 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002434
2435 def test_oserror_filename(self):
2436 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002437 (self.filenames, os.chdir,),
2438 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002439 (self.filenames, os.lstat,),
2440 (self.filenames, os.open, os.O_RDONLY),
2441 (self.filenames, os.rmdir,),
2442 (self.filenames, os.stat,),
2443 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002444 ]
2445 if sys.platform == "win32":
2446 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002447 (self.bytes_filenames, os.rename, b"dst"),
2448 (self.bytes_filenames, os.replace, b"dst"),
2449 (self.unicode_filenames, os.rename, "dst"),
2450 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002451 # Issue #16414: Don't test undecodable names with listdir()
2452 # because of a Windows bug.
2453 #
2454 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2455 # empty list (instead of failing), whereas os.listdir(b'\xff')
2456 # raises a FileNotFoundError. It looks like a Windows bug:
2457 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2458 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2459 # ERROR_PATH_NOT_FOUND (3).
2460 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002461 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002462 else:
2463 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002464 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002465 (self.filenames, os.rename, "dst"),
2466 (self.filenames, os.replace, "dst"),
2467 ))
2468 if hasattr(os, "chown"):
2469 funcs.append((self.filenames, os.chown, 0, 0))
2470 if hasattr(os, "lchown"):
2471 funcs.append((self.filenames, os.lchown, 0, 0))
2472 if hasattr(os, "truncate"):
2473 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002474 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002475 funcs.append((self.filenames, os.chflags, 0))
2476 if hasattr(os, "lchflags"):
2477 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002478 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002479 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002480 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002481 if sys.platform == "win32":
2482 funcs.append((self.bytes_filenames, os.link, b"dst"))
2483 funcs.append((self.unicode_filenames, os.link, "dst"))
2484 else:
2485 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002486 if hasattr(os, "listxattr"):
2487 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002488 (self.filenames, os.listxattr,),
2489 (self.filenames, os.getxattr, "user.test"),
2490 (self.filenames, os.setxattr, "user.test", b'user'),
2491 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002492 ))
2493 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002494 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002495 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002496 if sys.platform == "win32":
2497 funcs.append((self.unicode_filenames, os.readlink,))
2498 else:
2499 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002500
Victor Stinnerafe17062012-10-31 22:47:43 +01002501 for filenames, func, *func_args in funcs:
2502 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002503 try:
2504 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002505 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002506 self.assertIs(err.filename, name)
2507 else:
2508 self.fail("No exception thrown by {}".format(func))
2509
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002510class CPUCountTests(unittest.TestCase):
2511 def test_cpu_count(self):
2512 cpus = os.cpu_count()
2513 if cpus is not None:
2514 self.assertIsInstance(cpus, int)
2515 self.assertGreater(cpus, 0)
2516 else:
2517 self.skipTest("Could not determine the number of CPUs")
2518
Victor Stinnerdaf45552013-08-28 00:53:59 +02002519
2520class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002521 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002522 fd = os.open(__file__, os.O_RDONLY)
2523 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002524 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002525
Victor Stinnerdaf45552013-08-28 00:53:59 +02002526 os.set_inheritable(fd, True)
2527 self.assertEqual(os.get_inheritable(fd), True)
2528
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002529 @unittest.skipIf(fcntl is None, "need fcntl")
2530 def test_get_inheritable_cloexec(self):
2531 fd = os.open(__file__, os.O_RDONLY)
2532 self.addCleanup(os.close, fd)
2533 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002534
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002535 # clear FD_CLOEXEC flag
2536 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2537 flags &= ~fcntl.FD_CLOEXEC
2538 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002539
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002540 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002541
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002542 @unittest.skipIf(fcntl is None, "need fcntl")
2543 def test_set_inheritable_cloexec(self):
2544 fd = os.open(__file__, os.O_RDONLY)
2545 self.addCleanup(os.close, fd)
2546 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2547 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002548
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002549 os.set_inheritable(fd, True)
2550 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2551 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002552
Victor Stinnerdaf45552013-08-28 00:53:59 +02002553 def test_open(self):
2554 fd = os.open(__file__, os.O_RDONLY)
2555 self.addCleanup(os.close, fd)
2556 self.assertEqual(os.get_inheritable(fd), False)
2557
2558 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2559 def test_pipe(self):
2560 rfd, wfd = os.pipe()
2561 self.addCleanup(os.close, rfd)
2562 self.addCleanup(os.close, wfd)
2563 self.assertEqual(os.get_inheritable(rfd), False)
2564 self.assertEqual(os.get_inheritable(wfd), False)
2565
2566 def test_dup(self):
2567 fd1 = os.open(__file__, os.O_RDONLY)
2568 self.addCleanup(os.close, fd1)
2569
2570 fd2 = os.dup(fd1)
2571 self.addCleanup(os.close, fd2)
2572 self.assertEqual(os.get_inheritable(fd2), False)
2573
2574 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2575 def test_dup2(self):
2576 fd = os.open(__file__, os.O_RDONLY)
2577 self.addCleanup(os.close, fd)
2578
2579 # inheritable by default
2580 fd2 = os.open(__file__, os.O_RDONLY)
2581 try:
2582 os.dup2(fd, fd2)
2583 self.assertEqual(os.get_inheritable(fd2), True)
2584 finally:
2585 os.close(fd2)
2586
2587 # force non-inheritable
2588 fd3 = os.open(__file__, os.O_RDONLY)
2589 try:
2590 os.dup2(fd, fd3, inheritable=False)
2591 self.assertEqual(os.get_inheritable(fd3), False)
2592 finally:
2593 os.close(fd3)
2594
2595 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2596 def test_openpty(self):
2597 master_fd, slave_fd = os.openpty()
2598 self.addCleanup(os.close, master_fd)
2599 self.addCleanup(os.close, slave_fd)
2600 self.assertEqual(os.get_inheritable(master_fd), False)
2601 self.assertEqual(os.get_inheritable(slave_fd), False)
2602
2603
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002604@unittest.skipUnless(hasattr(os, 'get_blocking'),
2605 'needs os.get_blocking() and os.set_blocking()')
2606class BlockingTests(unittest.TestCase):
2607 def test_blocking(self):
2608 fd = os.open(__file__, os.O_RDONLY)
2609 self.addCleanup(os.close, fd)
2610 self.assertEqual(os.get_blocking(fd), True)
2611
2612 os.set_blocking(fd, False)
2613 self.assertEqual(os.get_blocking(fd), False)
2614
2615 os.set_blocking(fd, True)
2616 self.assertEqual(os.get_blocking(fd), True)
2617
2618
Yury Selivanov97e2e062014-09-26 12:33:06 -04002619
2620class ExportsTests(unittest.TestCase):
2621 def test_os_all(self):
2622 self.assertIn('open', os.__all__)
2623 self.assertIn('walk', os.__all__)
2624
2625
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002626@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002627def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002628 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002629 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002630 StatAttributeTests,
2631 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002632 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002633 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002634 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002635 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002636 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002637 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002638 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002639 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002640 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002641 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002642 Win32KillTests,
Tim Golden781bbeb2013-10-25 20:24:06 +01002643 Win32ListdirTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002644 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002645 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002646 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002647 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002648 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002649 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002650 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002651 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002652 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002653 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002654 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002655 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002656 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002657 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002658 CPUCountTests,
Victor Stinnerdaf45552013-08-28 00:53:59 +02002659 FDInheritanceTests,
Tim Golden0321cf22014-05-05 19:46:17 +01002660 Win32JunctionTests,
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002661 BlockingTests,
Yury Selivanov97e2e062014-09-26 12:33:06 -04002662 ExportsTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002663 )
Fred Drake2e2be372001-09-20 21:33:42 +00002664
2665if __name__ == "__main__":
2666 test_main()