blob: 152ba93576285b253a3f2bd1045de5157c564939 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070027import decimal
28import fractions
Christian Heimes25827622013-10-12 01:27:08 +020029import pickle
Victor Stinnerfe02e392014-12-21 01:16:38 +010030import sysconfig
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020043
Georg Brandl2daf6ae2012-02-20 19:54:16 +010044from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000045
Victor Stinner034d0aa2012-06-05 01:22:15 +020046with warnings.catch_warnings():
47 warnings.simplefilter("ignore", DeprecationWarning)
48 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010049st = os.stat(__file__)
50stat_supports_subsecond = (
51 # check if float and int timestamps are different
52 (st.st_atime != st[7])
53 or (st.st_mtime != st[8])
54 or (st.st_ctime != st[9]))
55
Mark Dickinson7cf03892010-04-16 13:45:35 +000056# Detect whether we're on a Linux system that uses the (now outdated
57# and unmaintained) linuxthreads threading library. There's an issue
58# when combining linuxthreads with a failed execv call: see
59# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020060if hasattr(sys, 'thread_info') and sys.thread_info.version:
61 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
62else:
63 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000064
Stefan Krahebee49a2013-01-17 15:31:00 +010065# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
66HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
67
Thomas Wouters0e3f5912006-08-11 14:57:12 +000068# Tests creating TESTFN
69class FileTests(unittest.TestCase):
70 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000071 if os.path.exists(support.TESTFN):
72 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000073 tearDown = setUp
74
75 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000076 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000077 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000078 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000079
Christian Heimesfdab48e2008-01-20 09:06:41 +000080 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000081 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
82 # We must allocate two consecutive file descriptors, otherwise
83 # it will mess up other file descriptors (perhaps even the three
84 # standard ones).
85 second = os.dup(first)
86 try:
87 retries = 0
88 while second != first + 1:
89 os.close(first)
90 retries += 1
91 if retries > 10:
92 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000093 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000094 first, second = second, os.dup(second)
95 finally:
96 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000097 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000098 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000100
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000101 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000102 def test_rename(self):
103 path = support.TESTFN
104 old = sys.getrefcount(path)
105 self.assertRaises(TypeError, os.rename, path, 0)
106 new = sys.getrefcount(path)
107 self.assertEqual(old, new)
108
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000109 def test_read(self):
110 with open(support.TESTFN, "w+b") as fobj:
111 fobj.write(b"spam")
112 fobj.flush()
113 fd = fobj.fileno()
114 os.lseek(fd, 0, 0)
115 s = os.read(fd, 4)
116 self.assertEqual(type(s), bytes)
117 self.assertEqual(s, b"spam")
118
119 def test_write(self):
120 # os.write() accepts bytes- and buffer-like objects but not strings
121 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
122 self.assertRaises(TypeError, os.write, fd, "beans")
123 os.write(fd, b"bacon\n")
124 os.write(fd, bytearray(b"eggs\n"))
125 os.write(fd, memoryview(b"spam\n"))
126 os.close(fd)
127 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000128 self.assertEqual(fobj.read().splitlines(),
129 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000130
Victor Stinnere0daff12011-03-20 23:36:35 +0100131 def write_windows_console(self, *args):
132 retcode = subprocess.call(args,
133 # use a new console to not flood the test output
134 creationflags=subprocess.CREATE_NEW_CONSOLE,
135 # use a shell to hide the console window (SW_HIDE)
136 shell=True)
137 self.assertEqual(retcode, 0)
138
139 @unittest.skipUnless(sys.platform == 'win32',
140 'test specific to the Windows console')
141 def test_write_windows_console(self):
142 # Issue #11395: the Windows console returns an error (12: not enough
143 # space error) on writing into stdout if stdout mode is binary and the
144 # length is greater than 66,000 bytes (or less, depending on heap
145 # usage).
146 code = "print('x' * 100000)"
147 self.write_windows_console(sys.executable, "-c", code)
148 self.write_windows_console(sys.executable, "-u", "-c", code)
149
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000150 def fdopen_helper(self, *args):
151 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200152 f = os.fdopen(fd, *args)
153 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000154
155 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200156 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
157 os.close(fd)
158
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000159 self.fdopen_helper()
160 self.fdopen_helper('r')
161 self.fdopen_helper('r', 100)
162
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100163 def test_replace(self):
164 TESTFN2 = support.TESTFN + ".2"
165 with open(support.TESTFN, 'w') as f:
166 f.write("1")
167 with open(TESTFN2, 'w') as f:
168 f.write("2")
169 self.addCleanup(os.unlink, TESTFN2)
170 os.replace(support.TESTFN, TESTFN2)
171 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
172 with open(TESTFN2, 'r') as f:
173 self.assertEqual(f.read(), "1")
174
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200175
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000176# Test attributes on return values from os.*stat* family.
177class StatAttributeTests(unittest.TestCase):
178 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000179 os.mkdir(support.TESTFN)
180 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000182 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000183 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000184
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000185 def tearDown(self):
186 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000187 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000188
Serhiy Storchaka43767632013-11-03 21:31:38 +0200189 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000190 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000191 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000192
193 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000194 self.assertEqual(result[stat.ST_SIZE], 3)
195 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000196
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000197 # Make sure all the attributes are there
198 members = dir(result)
199 for name in dir(stat):
200 if name[:3] == 'ST_':
201 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000202 if name.endswith("TIME"):
203 def trunc(x): return int(x)
204 else:
205 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000206 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000207 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000208 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000209
Larry Hastings6fe20b32012-04-19 15:07:49 -0700210 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700211 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700212 for name in 'st_atime st_mtime st_ctime'.split():
213 floaty = int(getattr(result, name) * 100000)
214 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700215 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700216
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000217 try:
218 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200219 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 except IndexError:
221 pass
222
223 # Make sure that assignment fails
224 try:
225 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200226 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000227 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000228 pass
229
230 try:
231 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200232 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000233 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234 pass
235
236 try:
237 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200238 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000239 except AttributeError:
240 pass
241
242 # Use the stat_result constructor with a too-short tuple.
243 try:
244 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200245 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246 except TypeError:
247 pass
248
Ezio Melotti42da6632011-03-15 05:18:48 +0200249 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000250 try:
251 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
252 except TypeError:
253 pass
254
Antoine Pitrou38425292010-09-21 18:19:07 +0000255 def test_stat_attributes(self):
256 self.check_stat_attributes(self.fname)
257
258 def test_stat_attributes_bytes(self):
259 try:
260 fname = self.fname.encode(sys.getfilesystemencoding())
261 except UnicodeEncodeError:
262 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100263 with warnings.catch_warnings():
264 warnings.simplefilter("ignore", DeprecationWarning)
265 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000266
Christian Heimes25827622013-10-12 01:27:08 +0200267 def test_stat_result_pickle(self):
268 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200269 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
270 p = pickle.dumps(result, proto)
271 self.assertIn(b'stat_result', p)
272 if proto < 4:
273 self.assertIn(b'cos\nstat_result\n', p)
274 unpickled = pickle.loads(p)
275 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200276
Serhiy Storchaka43767632013-11-03 21:31:38 +0200277 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000279 try:
280 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000281 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000282 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000283 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200284 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000285
286 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000287 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000288
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000289 # Make sure all the attributes are there.
290 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
291 'ffree', 'favail', 'flag', 'namemax')
292 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000293 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000294
295 # Make sure that assignment really fails
296 try:
297 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200298 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000299 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000300 pass
301
302 try:
303 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200304 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000305 except AttributeError:
306 pass
307
308 # Use the constructor with a too-short tuple.
309 try:
310 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200311 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000312 except TypeError:
313 pass
314
Ezio Melotti42da6632011-03-15 05:18:48 +0200315 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000316 try:
317 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
318 except TypeError:
319 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000320
Christian Heimes25827622013-10-12 01:27:08 +0200321 @unittest.skipUnless(hasattr(os, 'statvfs'),
322 "need os.statvfs()")
323 def test_statvfs_result_pickle(self):
324 try:
325 result = os.statvfs(self.fname)
326 except OSError as e:
327 # On AtheOS, glibc always returns ENOSYS
328 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200329 self.skipTest('os.statvfs() failed with ENOSYS')
330
Serhiy Storchakabad12572014-12-15 14:03:42 +0200331 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
332 p = pickle.dumps(result, proto)
333 self.assertIn(b'statvfs_result', p)
334 if proto < 4:
335 self.assertIn(b'cos\nstatvfs_result\n', p)
336 unpickled = pickle.loads(p)
337 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200338
Thomas Wouters89f507f2006-12-13 04:49:30 +0000339 def test_utime_dir(self):
340 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000341 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000342 # round to int, because some systems may support sub-second
343 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000344 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
345 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000346 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000347
Larry Hastings76ad59b2012-05-03 00:30:07 -0700348 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600349 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600350 # second argument. Check that the previous methods of passing
351 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700352 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600353 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700354 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
355 # Setting the time to the time you just read, then reading again,
356 # should always return exactly the same times.
357 st1 = os.stat(filename)
358 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
359 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600360 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700361 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600362 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700363 # Set to the current time in the new way
364 os.utime(filename)
365 st3 = os.stat(filename)
366 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
367
368 def test_utime(self):
369 def utime(file, times):
370 return os.utime(file, times)
371 self._test_utime(self.fname, getattr, utime, 10)
372 self._test_utime(support.TESTFN, getattr, utime, 10)
373
374
375 def _test_utime_ns(self, set_times_ns, test_dir=True):
376 def getattr_ns(o, attr):
377 return getattr(o, attr + "_ns")
378 ten_s = 10 * 1000 * 1000 * 1000
379 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
380 if test_dir:
381 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
382
383 def test_utime_ns(self):
384 def utime_ns(file, times):
385 return os.utime(file, ns=times)
386 self._test_utime_ns(utime_ns)
387
Larry Hastings9cf065c2012-06-22 16:30:09 -0700388 requires_utime_dir_fd = unittest.skipUnless(
389 os.utime in os.supports_dir_fd,
390 "dir_fd support for utime required for this test.")
391 requires_utime_fd = unittest.skipUnless(
392 os.utime in os.supports_fd,
393 "fd support for utime required for this test.")
394 requires_utime_nofollow_symlinks = unittest.skipUnless(
395 os.utime in os.supports_follow_symlinks,
396 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700397
Larry Hastings9cf065c2012-06-22 16:30:09 -0700398 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700399 def test_lutimes_ns(self):
400 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700401 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 self._test_utime_ns(lutimes_ns)
403
Larry Hastings9cf065c2012-06-22 16:30:09 -0700404 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700405 def test_futimes_ns(self):
406 def futimes_ns(file, times):
407 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700408 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700409 self._test_utime_ns(futimes_ns, test_dir=False)
410
411 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700412 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700413 getattr(os, name)(arg, (5, 5), ns=(5, 5))
414
415 def test_utime_invalid_arguments(self):
416 self._utime_invalid_arguments('utime', self.fname)
417
Brian Curtin52fbea12011-11-06 13:41:17 -0600418
Victor Stinner1aa54a42012-02-08 04:09:37 +0100419 @unittest.skipUnless(stat_supports_subsecond,
420 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100421 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100422 asec, amsec = 1, 901
423 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100424 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100425 mtime = msec + mmsec * 1e-3
426 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100427 os.utime(filename, (0, 0))
428 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200429 with warnings.catch_warnings():
430 warnings.simplefilter("ignore", DeprecationWarning)
431 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100432 st = os.stat(filename)
433 self.assertAlmostEqual(st.st_atime, atime, places=3)
434 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100435
Victor Stinnera2f7c002012-02-08 03:36:25 +0100436 def test_utime_subsecond(self):
437 def set_time(filename, atime, mtime):
438 os.utime(filename, (atime, mtime))
439 self._test_utime_subsecond(set_time)
440
Larry Hastings9cf065c2012-06-22 16:30:09 -0700441 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100442 def test_futimes_subsecond(self):
443 def set_time(filename, atime, mtime):
444 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700445 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100446 self._test_utime_subsecond(set_time)
447
Larry Hastings9cf065c2012-06-22 16:30:09 -0700448 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100449 def test_futimens_subsecond(self):
450 def set_time(filename, atime, mtime):
451 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700452 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100453 self._test_utime_subsecond(set_time)
454
Larry Hastings9cf065c2012-06-22 16:30:09 -0700455 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100456 def test_futimesat_subsecond(self):
457 def set_time(filename, atime, mtime):
458 dirname = os.path.dirname(filename)
459 dirfd = os.open(dirname, os.O_RDONLY)
460 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700461 os.utime(os.path.basename(filename), dir_fd=dirfd,
462 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100463 finally:
464 os.close(dirfd)
465 self._test_utime_subsecond(set_time)
466
Larry Hastings9cf065c2012-06-22 16:30:09 -0700467 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100468 def test_lutimes_subsecond(self):
469 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700470 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100471 self._test_utime_subsecond(set_time)
472
Larry Hastings9cf065c2012-06-22 16:30:09 -0700473 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100474 def test_utimensat_subsecond(self):
475 def set_time(filename, atime, mtime):
476 dirname = os.path.dirname(filename)
477 dirfd = os.open(dirname, os.O_RDONLY)
478 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700479 os.utime(os.path.basename(filename), dir_fd=dirfd,
480 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100481 finally:
482 os.close(dirfd)
483 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100484
Serhiy Storchaka43767632013-11-03 21:31:38 +0200485 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000486 # systems support centiseconds
Serhiy Storchaka43767632013-11-03 21:31:38 +0200487 def get_file_system(path):
488 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000489 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000490 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000491 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000492 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000493 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000494 return buf.value
495
Serhiy Storchaka43767632013-11-03 21:31:38 +0200496 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
497 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
498 "requires NTFS")
499 def test_1565150(self):
500 t1 = 1159195039.25
501 os.utime(self.fname, (t1, t1))
502 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000503
Serhiy Storchaka43767632013-11-03 21:31:38 +0200504 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
505 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
506 "requires NTFS")
507 def test_large_time(self):
508 t1 = 5000000000 # some day in 2128
509 os.utime(self.fname, (t1, t1))
510 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000511
Serhiy Storchaka43767632013-11-03 21:31:38 +0200512 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
513 def test_1686475(self):
514 # Verify that an open file can be stat'ed
515 try:
516 os.stat(r"c:\pagefile.sys")
517 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600518 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200519 except OSError as e:
520 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000521
Serhiy Storchaka43767632013-11-03 21:31:38 +0200522 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
523 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
524 def test_15261(self):
525 # Verify that stat'ing a closed fd does not cause crash
526 r, w = os.pipe()
527 try:
528 os.stat(r) # should not raise error
529 finally:
530 os.close(r)
531 os.close(w)
532 with self.assertRaises(OSError) as ctx:
533 os.stat(r)
534 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100535
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000536from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000537
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000538class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000539 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000540 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000541
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000542 def setUp(self):
543 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000544 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000545 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000546 for key, value in self._reference().items():
547 os.environ[key] = value
548
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000549 def tearDown(self):
550 os.environ.clear()
551 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000552 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000553 os.environb.clear()
554 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000555
Christian Heimes90333392007-11-01 19:08:42 +0000556 def _reference(self):
557 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
558
559 def _empty_mapping(self):
560 os.environ.clear()
561 return os.environ
562
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000563 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300564 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000565 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000566 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300567 os.environ.update(HELLO="World")
568 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
569 value = popen.read().strip()
570 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000571
Ezio Melottic7e139b2012-09-26 20:01:34 +0300572 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000573 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300574 with os.popen(
575 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
576 it = iter(popen)
577 self.assertEqual(next(it), "line1\n")
578 self.assertEqual(next(it), "line2\n")
579 self.assertEqual(next(it), "line3\n")
580 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000581
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000582 # Verify environ keys and values from the OS are of the
583 # correct str type.
584 def test_keyvalue_types(self):
585 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000586 self.assertEqual(type(key), str)
587 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000588
Christian Heimes90333392007-11-01 19:08:42 +0000589 def test_items(self):
590 for key, value in self._reference().items():
591 self.assertEqual(os.environ.get(key), value)
592
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000593 # Issue 7310
594 def test___repr__(self):
595 """Check that the repr() of os.environ looks like environ({...})."""
596 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000597 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
598 '{!r}: {!r}'.format(key, value)
599 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000600
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000601 def test_get_exec_path(self):
602 defpath_list = os.defpath.split(os.pathsep)
603 test_path = ['/monty', '/python', '', '/flying/circus']
604 test_env = {'PATH': os.pathsep.join(test_path)}
605
606 saved_environ = os.environ
607 try:
608 os.environ = dict(test_env)
609 # Test that defaulting to os.environ works.
610 self.assertSequenceEqual(test_path, os.get_exec_path())
611 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
612 finally:
613 os.environ = saved_environ
614
615 # No PATH environment variable
616 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
617 # Empty PATH environment variable
618 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
619 # Supplied PATH environment variable
620 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
621
Victor Stinnerb745a742010-05-18 17:17:23 +0000622 if os.supports_bytes_environ:
623 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000624 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000625 # ignore BytesWarning warning
626 with warnings.catch_warnings(record=True):
627 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000628 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000629 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000630 pass
631 else:
632 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000633
634 # bytes key and/or value
635 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
636 ['abc'])
637 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
638 ['abc'])
639 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
640 ['abc'])
641
642 @unittest.skipUnless(os.supports_bytes_environ,
643 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000644 def test_environb(self):
645 # os.environ -> os.environb
646 value = 'euro\u20ac'
647 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000648 value_bytes = value.encode(sys.getfilesystemencoding(),
649 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000650 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000651 msg = "U+20AC character is not encodable to %s" % (
652 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000653 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000654 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000655 self.assertEqual(os.environ['unicode'], value)
656 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000657
658 # os.environb -> os.environ
659 value = b'\xff'
660 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000661 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000662 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000663 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000664
Charles-François Natali2966f102011-11-26 11:32:46 +0100665 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
666 # #13415).
667 @support.requires_freebsd_version(7)
668 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100669 def test_unset_error(self):
670 if sys.platform == "win32":
671 # an environment variable is limited to 32,767 characters
672 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100673 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100674 else:
675 # "=" is not allowed in a variable name
676 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100677 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100678
Victor Stinner6d101392013-04-14 16:35:04 +0200679 def test_key_type(self):
680 missing = 'missingkey'
681 self.assertNotIn(missing, os.environ)
682
Victor Stinner839e5ea2013-04-14 16:43:03 +0200683 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200684 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200685 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200686 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200687
Victor Stinner839e5ea2013-04-14 16:43:03 +0200688 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200689 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200690 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200691 self.assertTrue(cm.exception.__suppress_context__)
692
Victor Stinner6d101392013-04-14 16:35:04 +0200693
Tim Petersc4e09402003-04-25 07:11:48 +0000694class WalkTests(unittest.TestCase):
695 """Tests for os.walk()."""
696
Charles-François Natali7372b062012-02-05 15:15:38 +0100697 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000698 import os
699 from os.path import join
700
701 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000702 # TESTFN/
703 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000704 # tmp1
705 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000706 # tmp2
707 # SUB11/ no kids
708 # SUB2/ a file kid and a dirsymlink kid
709 # tmp3
710 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200711 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000712 # TEST2/
713 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000714 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000715 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000716 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000717 sub2_path = join(walk_path, "SUB2")
718 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000719 tmp2_path = join(sub1_path, "tmp2")
720 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000721 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000722 t2_path = join(support.TESTFN, "TEST2")
723 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200724 link_path = join(sub2_path, "link")
725 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000726
727 # Create stuff.
728 os.makedirs(sub11_path)
729 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000730 os.makedirs(t2_path)
731 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000732 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000733 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
734 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000735 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400736 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400737 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200738 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000739 else:
740 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000741
742 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000743 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000744 self.assertEqual(len(all), 4)
745 # We can't know which order SUB1 and SUB2 will appear in.
746 # Not flipped: TESTFN, SUB1, SUB11, SUB2
747 # flipped: TESTFN, SUB2, SUB1, SUB11
748 flipped = all[0][1][0] != "SUB1"
749 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200750 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000751 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000752 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
753 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000754 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000755
756 # Prune the search.
757 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000758 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000759 all.append((root, dirs, files))
760 # Don't descend into SUB1.
761 if 'SUB1' in dirs:
762 # Note that this also mutates the dirs we appended to all!
763 dirs.remove('SUB1')
764 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000765 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200766 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000767 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000768
769 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000770 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000771 self.assertEqual(len(all), 4)
772 # We can't know which order SUB1 and SUB2 will appear in.
773 # Not flipped: SUB11, SUB1, SUB2, TESTFN
774 # flipped: SUB2, SUB11, SUB1, TESTFN
775 flipped = all[3][1][0] != "SUB1"
776 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200777 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000778 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000779 self.assertEqual(all[flipped], (sub11_path, [], []))
780 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000781 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000782
Brian Curtin3b4499c2010-12-28 14:31:47 +0000783 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000784 # Walk, following symlinks.
785 for root, dirs, files in os.walk(walk_path, followlinks=True):
786 if root == link_path:
787 self.assertEqual(dirs, [])
788 self.assertEqual(files, ["tmp4"])
789 break
790 else:
791 self.fail("Didn't follow symlink with followlinks=True")
792
793 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000794 # Tear everything down. This is a decent use for bottom-up on
795 # Windows, which doesn't have a recursive delete command. The
796 # (not so) subtlety is that rmdir will fail unless the dir's
797 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000798 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000799 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000800 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000801 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000802 dirname = os.path.join(root, name)
803 if not os.path.islink(dirname):
804 os.rmdir(dirname)
805 else:
806 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000807 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000808
Charles-François Natali7372b062012-02-05 15:15:38 +0100809
810@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
811class FwalkTests(WalkTests):
812 """Tests for os.fwalk()."""
813
Larry Hastingsc48fe982012-06-25 04:49:05 -0700814 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
815 """
816 compare with walk() results.
817 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700818 walk_kwargs = walk_kwargs.copy()
819 fwalk_kwargs = fwalk_kwargs.copy()
820 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
821 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
822 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700823
Charles-François Natali7372b062012-02-05 15:15:38 +0100824 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700825 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100826 expected[root] = (set(dirs), set(files))
827
Larry Hastingsc48fe982012-06-25 04:49:05 -0700828 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100829 self.assertIn(root, expected)
830 self.assertEqual(expected[root], (set(dirs), set(files)))
831
Larry Hastingsc48fe982012-06-25 04:49:05 -0700832 def test_compare_to_walk(self):
833 kwargs = {'top': support.TESTFN}
834 self._compare_to_walk(kwargs, kwargs)
835
Charles-François Natali7372b062012-02-05 15:15:38 +0100836 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700837 try:
838 fd = os.open(".", os.O_RDONLY)
839 walk_kwargs = {'top': support.TESTFN}
840 fwalk_kwargs = walk_kwargs.copy()
841 fwalk_kwargs['dir_fd'] = fd
842 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
843 finally:
844 os.close(fd)
845
846 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100847 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700848 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
849 args = support.TESTFN, topdown, None
850 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100851 # check that the FD is valid
852 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700853 # redundant check
854 os.stat(rootfd)
855 # check that listdir() returns consistent information
856 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100857
858 def test_fd_leak(self):
859 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
860 # we both check that calling fwalk() a large number of times doesn't
861 # yield EMFILE, and that the minimum allocated FD hasn't changed.
862 minfd = os.dup(1)
863 os.close(minfd)
864 for i in range(256):
865 for x in os.fwalk(support.TESTFN):
866 pass
867 newfd = os.dup(1)
868 self.addCleanup(os.close, newfd)
869 self.assertEqual(newfd, minfd)
870
871 def tearDown(self):
872 # cleanup
873 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
874 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700875 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100876 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700877 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700878 if stat.S_ISDIR(st.st_mode):
879 os.rmdir(name, dir_fd=rootfd)
880 else:
881 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100882 os.rmdir(support.TESTFN)
883
884
Guido van Rossume7ba4952007-06-06 23:52:48 +0000885class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000886 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000887 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000888
889 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000890 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000891 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
892 os.makedirs(path) # Should work
893 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
894 os.makedirs(path)
895
896 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000897 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000898 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
899 os.makedirs(path)
900 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
901 'dir5', 'dir6')
902 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000903
Terry Reedy5a22b652010-12-02 07:05:56 +0000904 def test_exist_ok_existing_directory(self):
905 path = os.path.join(support.TESTFN, 'dir1')
906 mode = 0o777
907 old_mask = os.umask(0o022)
908 os.makedirs(path, mode)
909 self.assertRaises(OSError, os.makedirs, path, mode)
910 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -0400911 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +0000912 os.makedirs(path, mode=mode, exist_ok=True)
913 os.umask(old_mask)
914
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400915 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700916 def test_chown_uid_gid_arguments_must_be_index(self):
917 stat = os.stat(support.TESTFN)
918 uid = stat.st_uid
919 gid = stat.st_gid
920 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
921 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
922 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
923 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
924 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
925
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700926 def test_exist_ok_s_isgid_directory(self):
927 path = os.path.join(support.TESTFN, 'dir1')
928 S_ISGID = stat.S_ISGID
929 mode = 0o777
930 old_mask = os.umask(0o022)
931 try:
932 existing_testfn_mode = stat.S_IMODE(
933 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700934 try:
935 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700936 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700937 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700938 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
939 raise unittest.SkipTest('No support for S_ISGID dir mode.')
940 # The os should apply S_ISGID from the parent dir for us, but
941 # this test need not depend on that behavior. Be explicit.
942 os.makedirs(path, mode | S_ISGID)
943 # http://bugs.python.org/issue14992
944 # Should not fail when the bit is already set.
945 os.makedirs(path, mode, exist_ok=True)
946 # remove the bit.
947 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -0400948 # May work even when the bit is not already set when demanded.
949 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700950 finally:
951 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000952
953 def test_exist_ok_existing_regular_file(self):
954 base = support.TESTFN
955 path = os.path.join(support.TESTFN, 'dir1')
956 f = open(path, 'w')
957 f.write('abc')
958 f.close()
959 self.assertRaises(OSError, os.makedirs, path)
960 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
961 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
962 os.remove(path)
963
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000964 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000965 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000966 'dir4', 'dir5', 'dir6')
967 # If the tests failed, the bottom-most directory ('../dir6')
968 # may not have been created, so we look for the outermost directory
969 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000970 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000971 path = os.path.dirname(path)
972
973 os.removedirs(path)
974
Andrew Svetlov405faed2012-12-25 12:18:09 +0200975
976class RemoveDirsTests(unittest.TestCase):
977 def setUp(self):
978 os.makedirs(support.TESTFN)
979
980 def tearDown(self):
981 support.rmtree(support.TESTFN)
982
983 def test_remove_all(self):
984 dira = os.path.join(support.TESTFN, 'dira')
985 os.mkdir(dira)
986 dirb = os.path.join(dira, 'dirb')
987 os.mkdir(dirb)
988 os.removedirs(dirb)
989 self.assertFalse(os.path.exists(dirb))
990 self.assertFalse(os.path.exists(dira))
991 self.assertFalse(os.path.exists(support.TESTFN))
992
993 def test_remove_partial(self):
994 dira = os.path.join(support.TESTFN, 'dira')
995 os.mkdir(dira)
996 dirb = os.path.join(dira, 'dirb')
997 os.mkdir(dirb)
998 with open(os.path.join(dira, 'file.txt'), 'w') as f:
999 f.write('text')
1000 os.removedirs(dirb)
1001 self.assertFalse(os.path.exists(dirb))
1002 self.assertTrue(os.path.exists(dira))
1003 self.assertTrue(os.path.exists(support.TESTFN))
1004
1005 def test_remove_nothing(self):
1006 dira = os.path.join(support.TESTFN, 'dira')
1007 os.mkdir(dira)
1008 dirb = os.path.join(dira, 'dirb')
1009 os.mkdir(dirb)
1010 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1011 f.write('text')
1012 with self.assertRaises(OSError):
1013 os.removedirs(dirb)
1014 self.assertTrue(os.path.exists(dirb))
1015 self.assertTrue(os.path.exists(dira))
1016 self.assertTrue(os.path.exists(support.TESTFN))
1017
1018
Guido van Rossume7ba4952007-06-06 23:52:48 +00001019class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001020 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001021 with open(os.devnull, 'wb') as f:
1022 f.write(b'hello')
1023 f.close()
1024 with open(os.devnull, 'rb') as f:
1025 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001026
Andrew Svetlov405faed2012-12-25 12:18:09 +02001027
Guido van Rossume7ba4952007-06-06 23:52:48 +00001028class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001029 def test_urandom_length(self):
1030 self.assertEqual(len(os.urandom(0)), 0)
1031 self.assertEqual(len(os.urandom(1)), 1)
1032 self.assertEqual(len(os.urandom(10)), 10)
1033 self.assertEqual(len(os.urandom(100)), 100)
1034 self.assertEqual(len(os.urandom(1000)), 1000)
1035
1036 def test_urandom_value(self):
1037 data1 = os.urandom(16)
1038 data2 = os.urandom(16)
1039 self.assertNotEqual(data1, data2)
1040
1041 def get_urandom_subprocess(self, count):
1042 code = '\n'.join((
1043 'import os, sys',
1044 'data = os.urandom(%s)' % count,
1045 'sys.stdout.buffer.write(data)',
1046 'sys.stdout.buffer.flush()'))
1047 out = assert_python_ok('-c', code)
1048 stdout = out[1]
1049 self.assertEqual(len(stdout), 16)
1050 return stdout
1051
1052 def test_urandom_subprocess(self):
1053 data1 = self.get_urandom_subprocess(16)
1054 data2 = self.get_urandom_subprocess(16)
1055 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001056
Victor Stinnerfe02e392014-12-21 01:16:38 +01001057
1058HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
1059
1060@unittest.skipIf(HAVE_GETENTROPY,
1061 "getentropy() does not use a file descriptor")
1062class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001063 @unittest.skipUnless(resource, "test requires the resource module")
1064 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001065 # Check urandom() failing when it is not able to open /dev/random.
1066 # We spawn a new process to make the test more robust (if getrlimit()
1067 # failed to restore the file descriptor limit after this, the whole
1068 # test suite would crash; this actually happened on the OS X Tiger
1069 # buildbot).
1070 code = """if 1:
1071 import errno
1072 import os
1073 import resource
1074
1075 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1076 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1077 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001078 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001079 except OSError as e:
1080 assert e.errno == errno.EMFILE, e.errno
1081 else:
1082 raise AssertionError("OSError not raised")
1083 """
1084 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001085
Antoine Pitroue472aea2014-04-26 14:33:03 +02001086 def test_urandom_fd_closed(self):
1087 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1088 # closed.
1089 code = """if 1:
1090 import os
1091 import sys
1092 os.urandom(4)
1093 os.closerange(3, 256)
1094 sys.stdout.buffer.write(os.urandom(4))
1095 """
1096 rc, out, err = assert_python_ok('-Sc', code)
1097
1098 def test_urandom_fd_reopened(self):
1099 # Issue #21207: urandom() should detect its fd to /dev/urandom
1100 # changed to something else, and reopen it.
1101 with open(support.TESTFN, 'wb') as f:
1102 f.write(b"x" * 256)
1103 self.addCleanup(os.unlink, support.TESTFN)
1104 code = """if 1:
1105 import os
1106 import sys
1107 os.urandom(4)
1108 for fd in range(3, 256):
1109 try:
1110 os.close(fd)
1111 except OSError:
1112 pass
1113 else:
1114 # Found the urandom fd (XXX hopefully)
1115 break
1116 os.closerange(3, 256)
1117 with open({TESTFN!r}, 'rb') as f:
1118 os.dup2(f.fileno(), fd)
1119 sys.stdout.buffer.write(os.urandom(4))
1120 sys.stdout.buffer.write(os.urandom(4))
1121 """.format(TESTFN=support.TESTFN)
1122 rc, out, err = assert_python_ok('-Sc', code)
1123 self.assertEqual(len(out), 8)
1124 self.assertNotEqual(out[0:4], out[4:8])
1125 rc, out2, err2 = assert_python_ok('-Sc', code)
1126 self.assertEqual(len(out2), 8)
1127 self.assertNotEqual(out2, out)
1128
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001129
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001130@contextlib.contextmanager
1131def _execvpe_mockup(defpath=None):
1132 """
1133 Stubs out execv and execve functions when used as context manager.
1134 Records exec calls. The mock execv and execve functions always raise an
1135 exception as they would normally never return.
1136 """
1137 # A list of tuples containing (function name, first arg, args)
1138 # of calls to execv or execve that have been made.
1139 calls = []
1140
1141 def mock_execv(name, *args):
1142 calls.append(('execv', name, args))
1143 raise RuntimeError("execv called")
1144
1145 def mock_execve(name, *args):
1146 calls.append(('execve', name, args))
1147 raise OSError(errno.ENOTDIR, "execve called")
1148
1149 try:
1150 orig_execv = os.execv
1151 orig_execve = os.execve
1152 orig_defpath = os.defpath
1153 os.execv = mock_execv
1154 os.execve = mock_execve
1155 if defpath is not None:
1156 os.defpath = defpath
1157 yield calls
1158 finally:
1159 os.execv = orig_execv
1160 os.execve = orig_execve
1161 os.defpath = orig_defpath
1162
Guido van Rossume7ba4952007-06-06 23:52:48 +00001163class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001164 @unittest.skipIf(USING_LINUXTHREADS,
1165 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001166 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001167 self.assertRaises(OSError, os.execvpe, 'no such app-',
1168 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001169
Thomas Heller6790d602007-08-30 17:15:14 +00001170 def test_execvpe_with_bad_arglist(self):
1171 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1172
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001173 @unittest.skipUnless(hasattr(os, '_execvpe'),
1174 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001175 def _test_internal_execvpe(self, test_type):
1176 program_path = os.sep + 'absolutepath'
1177 if test_type is bytes:
1178 program = b'executable'
1179 fullpath = os.path.join(os.fsencode(program_path), program)
1180 native_fullpath = fullpath
1181 arguments = [b'progname', 'arg1', 'arg2']
1182 else:
1183 program = 'executable'
1184 arguments = ['progname', 'arg1', 'arg2']
1185 fullpath = os.path.join(program_path, program)
1186 if os.name != "nt":
1187 native_fullpath = os.fsencode(fullpath)
1188 else:
1189 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001190 env = {'spam': 'beans'}
1191
Victor Stinnerb745a742010-05-18 17:17:23 +00001192 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001193 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001194 self.assertRaises(RuntimeError,
1195 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001196 self.assertEqual(len(calls), 1)
1197 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1198
Victor Stinnerb745a742010-05-18 17:17:23 +00001199 # test os._execvpe() with a relative path:
1200 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001201 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001202 self.assertRaises(OSError,
1203 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001204 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001205 self.assertSequenceEqual(calls[0],
1206 ('execve', native_fullpath, (arguments, env)))
1207
1208 # test os._execvpe() with a relative path:
1209 # os.get_exec_path() reads the 'PATH' variable
1210 with _execvpe_mockup() as calls:
1211 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001212 if test_type is bytes:
1213 env_path[b'PATH'] = program_path
1214 else:
1215 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001216 self.assertRaises(OSError,
1217 os._execvpe, program, arguments, env=env_path)
1218 self.assertEqual(len(calls), 1)
1219 self.assertSequenceEqual(calls[0],
1220 ('execve', native_fullpath, (arguments, env_path)))
1221
1222 def test_internal_execvpe_str(self):
1223 self._test_internal_execvpe(str)
1224 if os.name != "nt":
1225 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001226
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001227
Serhiy Storchaka43767632013-11-03 21:31:38 +02001228@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001229class Win32ErrorTests(unittest.TestCase):
1230 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001231 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001232
1233 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001234 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001235
1236 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001237 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001238
1239 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001240 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001241 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001242 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001243 finally:
1244 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001245 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001246
1247 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001248 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001249
Thomas Wouters477c8d52006-05-27 19:21:47 +00001250 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001251 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001252
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001253class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001254 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001255 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1256 #singles.append("close")
1257 #We omit close because it doesn'r raise an exception on some platforms
1258 def get_single(f):
1259 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001260 if hasattr(os, f):
1261 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001262 return helper
1263 for f in singles:
1264 locals()["test_"+f] = get_single(f)
1265
Benjamin Peterson7522c742009-01-19 21:00:09 +00001266 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001267 try:
1268 f(support.make_bad_fd(), *args)
1269 except OSError as e:
1270 self.assertEqual(e.errno, errno.EBADF)
1271 else:
1272 self.fail("%r didn't raise a OSError with a bad file descriptor"
1273 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001274
Serhiy Storchaka43767632013-11-03 21:31:38 +02001275 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001276 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001277 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001278
Serhiy Storchaka43767632013-11-03 21:31:38 +02001279 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001280 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001281 fd = support.make_bad_fd()
1282 # Make sure none of the descriptors we are about to close are
1283 # currently valid (issue 6542).
1284 for i in range(10):
1285 try: os.fstat(fd+i)
1286 except OSError:
1287 pass
1288 else:
1289 break
1290 if i < 2:
1291 raise unittest.SkipTest(
1292 "Unable to acquire a range of invalid file descriptors")
1293 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001294
Serhiy Storchaka43767632013-11-03 21:31:38 +02001295 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001296 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001297 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001298
Serhiy Storchaka43767632013-11-03 21:31:38 +02001299 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001300 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001301 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001302
Serhiy Storchaka43767632013-11-03 21:31:38 +02001303 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001304 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001305 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001306
Serhiy Storchaka43767632013-11-03 21:31:38 +02001307 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001308 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001309 self.check(os.pathconf, "PC_NAME_MAX")
1310 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001311
Serhiy Storchaka43767632013-11-03 21:31:38 +02001312 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001313 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001314 self.check(os.truncate, 0)
1315 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001316
Serhiy Storchaka43767632013-11-03 21:31:38 +02001317 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001318 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001319 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001320
Serhiy Storchaka43767632013-11-03 21:31:38 +02001321 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001322 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001323 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001324
Victor Stinner57ddf782014-01-08 15:21:28 +01001325 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1326 def test_readv(self):
1327 buf = bytearray(10)
1328 self.check(os.readv, [buf])
1329
Serhiy Storchaka43767632013-11-03 21:31:38 +02001330 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001331 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001332 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001333
Serhiy Storchaka43767632013-11-03 21:31:38 +02001334 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001335 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001336 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001337
Victor Stinner57ddf782014-01-08 15:21:28 +01001338 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1339 def test_writev(self):
1340 self.check(os.writev, [b'abc'])
1341
Brian Curtin1b9df392010-11-24 20:24:31 +00001342
1343class LinkTests(unittest.TestCase):
1344 def setUp(self):
1345 self.file1 = support.TESTFN
1346 self.file2 = os.path.join(support.TESTFN + "2")
1347
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001348 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001349 for file in (self.file1, self.file2):
1350 if os.path.exists(file):
1351 os.unlink(file)
1352
Brian Curtin1b9df392010-11-24 20:24:31 +00001353 def _test_link(self, file1, file2):
1354 with open(file1, "w") as f1:
1355 f1.write("test")
1356
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001357 with warnings.catch_warnings():
1358 warnings.simplefilter("ignore", DeprecationWarning)
1359 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001360 with open(file1, "r") as f1, open(file2, "r") as f2:
1361 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1362
1363 def test_link(self):
1364 self._test_link(self.file1, self.file2)
1365
1366 def test_link_bytes(self):
1367 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1368 bytes(self.file2, sys.getfilesystemencoding()))
1369
Brian Curtinf498b752010-11-30 15:54:04 +00001370 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001371 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001372 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001373 except UnicodeError:
1374 raise unittest.SkipTest("Unable to encode for this platform.")
1375
Brian Curtinf498b752010-11-30 15:54:04 +00001376 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001377 self.file2 = self.file1 + "2"
1378 self._test_link(self.file1, self.file2)
1379
Serhiy Storchaka43767632013-11-03 21:31:38 +02001380@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1381class PosixUidGidTests(unittest.TestCase):
1382 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1383 def test_setuid(self):
1384 if os.getuid() != 0:
1385 self.assertRaises(OSError, os.setuid, 0)
1386 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001387
Serhiy Storchaka43767632013-11-03 21:31:38 +02001388 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1389 def test_setgid(self):
1390 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1391 self.assertRaises(OSError, os.setgid, 0)
1392 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001393
Serhiy Storchaka43767632013-11-03 21:31:38 +02001394 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1395 def test_seteuid(self):
1396 if os.getuid() != 0:
1397 self.assertRaises(OSError, os.seteuid, 0)
1398 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001399
Serhiy Storchaka43767632013-11-03 21:31:38 +02001400 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1401 def test_setegid(self):
1402 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1403 self.assertRaises(OSError, os.setegid, 0)
1404 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001405
Serhiy Storchaka43767632013-11-03 21:31:38 +02001406 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1407 def test_setreuid(self):
1408 if os.getuid() != 0:
1409 self.assertRaises(OSError, os.setreuid, 0, 0)
1410 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1411 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001412
Serhiy Storchaka43767632013-11-03 21:31:38 +02001413 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1414 def test_setreuid_neg1(self):
1415 # Needs to accept -1. We run this in a subprocess to avoid
1416 # altering the test runner's process state (issue8045).
1417 subprocess.check_call([
1418 sys.executable, '-c',
1419 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001420
Serhiy Storchaka43767632013-11-03 21:31:38 +02001421 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1422 def test_setregid(self):
1423 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1424 self.assertRaises(OSError, os.setregid, 0, 0)
1425 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1426 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001427
Serhiy Storchaka43767632013-11-03 21:31:38 +02001428 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1429 def test_setregid_neg1(self):
1430 # Needs to accept -1. We run this in a subprocess to avoid
1431 # altering the test runner's process state (issue8045).
1432 subprocess.check_call([
1433 sys.executable, '-c',
1434 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001435
Serhiy Storchaka43767632013-11-03 21:31:38 +02001436@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1437class Pep383Tests(unittest.TestCase):
1438 def setUp(self):
1439 if support.TESTFN_UNENCODABLE:
1440 self.dir = support.TESTFN_UNENCODABLE
1441 elif support.TESTFN_NONASCII:
1442 self.dir = support.TESTFN_NONASCII
1443 else:
1444 self.dir = support.TESTFN
1445 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001446
Serhiy Storchaka43767632013-11-03 21:31:38 +02001447 bytesfn = []
1448 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001449 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001450 fn = os.fsencode(fn)
1451 except UnicodeEncodeError:
1452 return
1453 bytesfn.append(fn)
1454 add_filename(support.TESTFN_UNICODE)
1455 if support.TESTFN_UNENCODABLE:
1456 add_filename(support.TESTFN_UNENCODABLE)
1457 if support.TESTFN_NONASCII:
1458 add_filename(support.TESTFN_NONASCII)
1459 if not bytesfn:
1460 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001461
Serhiy Storchaka43767632013-11-03 21:31:38 +02001462 self.unicodefn = set()
1463 os.mkdir(self.dir)
1464 try:
1465 for fn in bytesfn:
1466 support.create_empty_file(os.path.join(self.bdir, fn))
1467 fn = os.fsdecode(fn)
1468 if fn in self.unicodefn:
1469 raise ValueError("duplicate filename")
1470 self.unicodefn.add(fn)
1471 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001472 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001473 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001474
Serhiy Storchaka43767632013-11-03 21:31:38 +02001475 def tearDown(self):
1476 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001477
Serhiy Storchaka43767632013-11-03 21:31:38 +02001478 def test_listdir(self):
1479 expected = self.unicodefn
1480 found = set(os.listdir(self.dir))
1481 self.assertEqual(found, expected)
1482 # test listdir without arguments
1483 current_directory = os.getcwd()
1484 try:
1485 os.chdir(os.sep)
1486 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1487 finally:
1488 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001489
Serhiy Storchaka43767632013-11-03 21:31:38 +02001490 def test_open(self):
1491 for fn in self.unicodefn:
1492 f = open(os.path.join(self.dir, fn), 'rb')
1493 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001494
Serhiy Storchaka43767632013-11-03 21:31:38 +02001495 @unittest.skipUnless(hasattr(os, 'statvfs'),
1496 "need os.statvfs()")
1497 def test_statvfs(self):
1498 # issue #9645
1499 for fn in self.unicodefn:
1500 # should not fail with file not found error
1501 fullname = os.path.join(self.dir, fn)
1502 os.statvfs(fullname)
1503
1504 def test_stat(self):
1505 for fn in self.unicodefn:
1506 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001507
Brian Curtineb24d742010-04-12 17:16:38 +00001508@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1509class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001510 def _kill(self, sig):
1511 # Start sys.executable as a subprocess and communicate from the
1512 # subprocess to the parent that the interpreter is ready. When it
1513 # becomes ready, send *sig* via os.kill to the subprocess and check
1514 # that the return code is equal to *sig*.
1515 import ctypes
1516 from ctypes import wintypes
1517 import msvcrt
1518
1519 # Since we can't access the contents of the process' stdout until the
1520 # process has exited, use PeekNamedPipe to see what's inside stdout
1521 # without waiting. This is done so we can tell that the interpreter
1522 # is started and running at a point where it could handle a signal.
1523 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1524 PeekNamedPipe.restype = wintypes.BOOL
1525 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1526 ctypes.POINTER(ctypes.c_char), # stdout buf
1527 wintypes.DWORD, # Buffer size
1528 ctypes.POINTER(wintypes.DWORD), # bytes read
1529 ctypes.POINTER(wintypes.DWORD), # bytes avail
1530 ctypes.POINTER(wintypes.DWORD)) # bytes left
1531 msg = "running"
1532 proc = subprocess.Popen([sys.executable, "-c",
1533 "import sys;"
1534 "sys.stdout.write('{}');"
1535 "sys.stdout.flush();"
1536 "input()".format(msg)],
1537 stdout=subprocess.PIPE,
1538 stderr=subprocess.PIPE,
1539 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001540 self.addCleanup(proc.stdout.close)
1541 self.addCleanup(proc.stderr.close)
1542 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001543
1544 count, max = 0, 100
1545 while count < max and proc.poll() is None:
1546 # Create a string buffer to store the result of stdout from the pipe
1547 buf = ctypes.create_string_buffer(len(msg))
1548 # Obtain the text currently in proc.stdout
1549 # Bytes read/avail/left are left as NULL and unused
1550 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1551 buf, ctypes.sizeof(buf), None, None, None)
1552 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1553 if buf.value:
1554 self.assertEqual(msg, buf.value.decode())
1555 break
1556 time.sleep(0.1)
1557 count += 1
1558 else:
1559 self.fail("Did not receive communication from the subprocess")
1560
Brian Curtineb24d742010-04-12 17:16:38 +00001561 os.kill(proc.pid, sig)
1562 self.assertEqual(proc.wait(), sig)
1563
1564 def test_kill_sigterm(self):
1565 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001566 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001567
1568 def test_kill_int(self):
1569 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001570 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001571
1572 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001573 tagname = "test_os_%s" % uuid.uuid1()
1574 m = mmap.mmap(-1, 1, tagname)
1575 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001576 # Run a script which has console control handling enabled.
1577 proc = subprocess.Popen([sys.executable,
1578 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001579 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001580 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1581 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001582 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001583 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001584 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001585 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001586 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001587 count += 1
1588 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001589 # Forcefully kill the process if we weren't able to signal it.
1590 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001591 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001592 os.kill(proc.pid, event)
1593 # proc.send_signal(event) could also be done here.
1594 # Allow time for the signal to be passed and the process to exit.
1595 time.sleep(0.5)
1596 if not proc.poll():
1597 # Forcefully kill the process if we weren't able to signal it.
1598 os.kill(proc.pid, signal.SIGINT)
1599 self.fail("subprocess did not stop on {}".format(name))
1600
1601 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1602 def test_CTRL_C_EVENT(self):
1603 from ctypes import wintypes
1604 import ctypes
1605
1606 # Make a NULL value by creating a pointer with no argument.
1607 NULL = ctypes.POINTER(ctypes.c_int)()
1608 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1609 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1610 wintypes.BOOL)
1611 SetConsoleCtrlHandler.restype = wintypes.BOOL
1612
1613 # Calling this with NULL and FALSE causes the calling process to
1614 # handle CTRL+C, rather than ignore it. This property is inherited
1615 # by subprocesses.
1616 SetConsoleCtrlHandler(NULL, 0)
1617
1618 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1619
1620 def test_CTRL_BREAK_EVENT(self):
1621 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1622
1623
Brian Curtind40e6f72010-07-08 21:39:08 +00001624@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001625class Win32ListdirTests(unittest.TestCase):
1626 """Test listdir on Windows."""
1627
1628 def setUp(self):
1629 self.created_paths = []
1630 for i in range(2):
1631 dir_name = 'SUB%d' % i
1632 dir_path = os.path.join(support.TESTFN, dir_name)
1633 file_name = 'FILE%d' % i
1634 file_path = os.path.join(support.TESTFN, file_name)
1635 os.makedirs(dir_path)
1636 with open(file_path, 'w') as f:
1637 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1638 self.created_paths.extend([dir_name, file_name])
1639 self.created_paths.sort()
1640
1641 def tearDown(self):
1642 shutil.rmtree(support.TESTFN)
1643
1644 def test_listdir_no_extended_path(self):
1645 """Test when the path is not an "extended" path."""
1646 # unicode
1647 self.assertEqual(
1648 sorted(os.listdir(support.TESTFN)),
1649 self.created_paths)
1650 # bytes
1651 self.assertEqual(
1652 sorted(os.listdir(os.fsencode(support.TESTFN))),
1653 [os.fsencode(path) for path in self.created_paths])
1654
1655 def test_listdir_extended_path(self):
1656 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001657 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001658 # unicode
1659 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1660 self.assertEqual(
1661 sorted(os.listdir(path)),
1662 self.created_paths)
1663 # bytes
1664 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1665 self.assertEqual(
1666 sorted(os.listdir(path)),
1667 [os.fsencode(path) for path in self.created_paths])
1668
1669
1670@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001671@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001672class Win32SymlinkTests(unittest.TestCase):
1673 filelink = 'filelinktest'
1674 filelink_target = os.path.abspath(__file__)
1675 dirlink = 'dirlinktest'
1676 dirlink_target = os.path.dirname(filelink_target)
1677 missing_link = 'missing link'
1678
1679 def setUp(self):
1680 assert os.path.exists(self.dirlink_target)
1681 assert os.path.exists(self.filelink_target)
1682 assert not os.path.exists(self.dirlink)
1683 assert not os.path.exists(self.filelink)
1684 assert not os.path.exists(self.missing_link)
1685
1686 def tearDown(self):
1687 if os.path.exists(self.filelink):
1688 os.remove(self.filelink)
1689 if os.path.exists(self.dirlink):
1690 os.rmdir(self.dirlink)
1691 if os.path.lexists(self.missing_link):
1692 os.remove(self.missing_link)
1693
1694 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001695 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001696 self.assertTrue(os.path.exists(self.dirlink))
1697 self.assertTrue(os.path.isdir(self.dirlink))
1698 self.assertTrue(os.path.islink(self.dirlink))
1699 self.check_stat(self.dirlink, self.dirlink_target)
1700
1701 def test_file_link(self):
1702 os.symlink(self.filelink_target, self.filelink)
1703 self.assertTrue(os.path.exists(self.filelink))
1704 self.assertTrue(os.path.isfile(self.filelink))
1705 self.assertTrue(os.path.islink(self.filelink))
1706 self.check_stat(self.filelink, self.filelink_target)
1707
1708 def _create_missing_dir_link(self):
1709 'Create a "directory" link to a non-existent target'
1710 linkname = self.missing_link
1711 if os.path.lexists(linkname):
1712 os.remove(linkname)
1713 target = r'c:\\target does not exist.29r3c740'
1714 assert not os.path.exists(target)
1715 target_is_dir = True
1716 os.symlink(target, linkname, target_is_dir)
1717
1718 def test_remove_directory_link_to_missing_target(self):
1719 self._create_missing_dir_link()
1720 # For compatibility with Unix, os.remove will check the
1721 # directory status and call RemoveDirectory if the symlink
1722 # was created with target_is_dir==True.
1723 os.remove(self.missing_link)
1724
1725 @unittest.skip("currently fails; consider for improvement")
1726 def test_isdir_on_directory_link_to_missing_target(self):
1727 self._create_missing_dir_link()
1728 # consider having isdir return true for directory links
1729 self.assertTrue(os.path.isdir(self.missing_link))
1730
1731 @unittest.skip("currently fails; consider for improvement")
1732 def test_rmdir_on_directory_link_to_missing_target(self):
1733 self._create_missing_dir_link()
1734 # consider allowing rmdir to remove directory links
1735 os.rmdir(self.missing_link)
1736
1737 def check_stat(self, link, target):
1738 self.assertEqual(os.stat(link), os.stat(target))
1739 self.assertNotEqual(os.lstat(link), os.stat(link))
1740
Brian Curtind25aef52011-06-13 15:16:04 -05001741 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001742 with warnings.catch_warnings():
1743 warnings.simplefilter("ignore", DeprecationWarning)
1744 self.assertEqual(os.stat(bytes_link), os.stat(target))
1745 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001746
1747 def test_12084(self):
1748 level1 = os.path.abspath(support.TESTFN)
1749 level2 = os.path.join(level1, "level2")
1750 level3 = os.path.join(level2, "level3")
1751 try:
1752 os.mkdir(level1)
1753 os.mkdir(level2)
1754 os.mkdir(level3)
1755
1756 file1 = os.path.abspath(os.path.join(level1, "file1"))
1757
1758 with open(file1, "w") as f:
1759 f.write("file1")
1760
1761 orig_dir = os.getcwd()
1762 try:
1763 os.chdir(level2)
1764 link = os.path.join(level2, "link")
1765 os.symlink(os.path.relpath(file1), "link")
1766 self.assertIn("link", os.listdir(os.getcwd()))
1767
1768 # Check os.stat calls from the same dir as the link
1769 self.assertEqual(os.stat(file1), os.stat("link"))
1770
1771 # Check os.stat calls from a dir below the link
1772 os.chdir(level1)
1773 self.assertEqual(os.stat(file1),
1774 os.stat(os.path.relpath(link)))
1775
1776 # Check os.stat calls from a dir above the link
1777 os.chdir(level3)
1778 self.assertEqual(os.stat(file1),
1779 os.stat(os.path.relpath(link)))
1780 finally:
1781 os.chdir(orig_dir)
1782 except OSError as err:
1783 self.fail(err)
1784 finally:
1785 os.remove(file1)
1786 shutil.rmtree(level1)
1787
Brian Curtind40e6f72010-07-08 21:39:08 +00001788
Jason R. Coombs3a092862013-05-27 23:21:28 -04001789@support.skip_unless_symlink
1790class NonLocalSymlinkTests(unittest.TestCase):
1791
1792 def setUp(self):
1793 """
1794 Create this structure:
1795
1796 base
1797 \___ some_dir
1798 """
1799 os.makedirs('base/some_dir')
1800
1801 def tearDown(self):
1802 shutil.rmtree('base')
1803
1804 def test_directory_link_nonlocal(self):
1805 """
1806 The symlink target should resolve relative to the link, not relative
1807 to the current directory.
1808
1809 Then, link base/some_link -> base/some_dir and ensure that some_link
1810 is resolved as a directory.
1811
1812 In issue13772, it was discovered that directory detection failed if
1813 the symlink target was not specified relative to the current
1814 directory, which was a defect in the implementation.
1815 """
1816 src = os.path.join('base', 'some_link')
1817 os.symlink('some_dir', src)
1818 assert os.path.isdir(src)
1819
1820
Victor Stinnere8d51452010-08-19 01:05:19 +00001821class FSEncodingTests(unittest.TestCase):
1822 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001823 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1824 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001825
Victor Stinnere8d51452010-08-19 01:05:19 +00001826 def test_identity(self):
1827 # assert fsdecode(fsencode(x)) == x
1828 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1829 try:
1830 bytesfn = os.fsencode(fn)
1831 except UnicodeEncodeError:
1832 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001833 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001834
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001835
Brett Cannonefb00c02012-02-29 18:31:31 -05001836
1837class DeviceEncodingTests(unittest.TestCase):
1838
1839 def test_bad_fd(self):
1840 # Return None when an fd doesn't actually exist.
1841 self.assertIsNone(os.device_encoding(123456))
1842
Philip Jenveye308b7c2012-02-29 16:16:15 -08001843 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1844 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001845 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001846 def test_device_encoding(self):
1847 encoding = os.device_encoding(0)
1848 self.assertIsNotNone(encoding)
1849 self.assertTrue(codecs.lookup(encoding))
1850
1851
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001852class PidTests(unittest.TestCase):
1853 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1854 def test_getppid(self):
1855 p = subprocess.Popen([sys.executable, '-c',
1856 'import os; print(os.getppid())'],
1857 stdout=subprocess.PIPE)
1858 stdout, _ = p.communicate()
1859 # We are the parent of our subprocess
1860 self.assertEqual(int(stdout), os.getpid())
1861
1862
Brian Curtin0151b8e2010-09-24 13:43:43 +00001863# The introduction of this TestCase caused at least two different errors on
1864# *nix buildbots. Temporarily skip this to let the buildbots move along.
1865@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001866@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1867class LoginTests(unittest.TestCase):
1868 def test_getlogin(self):
1869 user_name = os.getlogin()
1870 self.assertNotEqual(len(user_name), 0)
1871
1872
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001873@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1874 "needs os.getpriority and os.setpriority")
1875class ProgramPriorityTests(unittest.TestCase):
1876 """Tests for os.getpriority() and os.setpriority()."""
1877
1878 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001879
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001880 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1881 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1882 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001883 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1884 if base >= 19 and new_prio <= 19:
1885 raise unittest.SkipTest(
1886 "unable to reliably test setpriority at current nice level of %s" % base)
1887 else:
1888 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001889 finally:
1890 try:
1891 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1892 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001893 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001894 raise
1895
1896
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001897if threading is not None:
1898 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001899
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001900 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001901
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001902 def __init__(self, conn):
1903 asynchat.async_chat.__init__(self, conn)
1904 self.in_buffer = []
1905 self.closed = False
1906 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001907
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001908 def handle_read(self):
1909 data = self.recv(4096)
1910 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001911
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001912 def get_data(self):
1913 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001914
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001915 def handle_close(self):
1916 self.close()
1917 self.closed = True
1918
1919 def handle_error(self):
1920 raise
1921
1922 def __init__(self, address):
1923 threading.Thread.__init__(self)
1924 asyncore.dispatcher.__init__(self)
1925 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1926 self.bind(address)
1927 self.listen(5)
1928 self.host, self.port = self.socket.getsockname()[:2]
1929 self.handler_instance = None
1930 self._active = False
1931 self._active_lock = threading.Lock()
1932
1933 # --- public API
1934
1935 @property
1936 def running(self):
1937 return self._active
1938
1939 def start(self):
1940 assert not self.running
1941 self.__flag = threading.Event()
1942 threading.Thread.start(self)
1943 self.__flag.wait()
1944
1945 def stop(self):
1946 assert self.running
1947 self._active = False
1948 self.join()
1949
1950 def wait(self):
1951 # wait for handler connection to be closed, then stop the server
1952 while not getattr(self.handler_instance, "closed", False):
1953 time.sleep(0.001)
1954 self.stop()
1955
1956 # --- internals
1957
1958 def run(self):
1959 self._active = True
1960 self.__flag.set()
1961 while self._active and asyncore.socket_map:
1962 self._active_lock.acquire()
1963 asyncore.loop(timeout=0.001, count=1)
1964 self._active_lock.release()
1965 asyncore.close_all()
1966
1967 def handle_accept(self):
1968 conn, addr = self.accept()
1969 self.handler_instance = self.Handler(conn)
1970
1971 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001972 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001973 handle_read = handle_connect
1974
1975 def writable(self):
1976 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001977
1978 def handle_error(self):
1979 raise
1980
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001981
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001982@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001983@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1984class TestSendfile(unittest.TestCase):
1985
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001986 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001987 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001988 not sys.platform.startswith("solaris") and \
1989 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02001990 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
1991 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001992
1993 @classmethod
1994 def setUpClass(cls):
1995 with open(support.TESTFN, "wb") as f:
1996 f.write(cls.DATA)
1997
1998 @classmethod
1999 def tearDownClass(cls):
2000 support.unlink(support.TESTFN)
2001
2002 def setUp(self):
2003 self.server = SendfileTestServer((support.HOST, 0))
2004 self.server.start()
2005 self.client = socket.socket()
2006 self.client.connect((self.server.host, self.server.port))
2007 self.client.settimeout(1)
2008 # synchronize by waiting for "220 ready" response
2009 self.client.recv(1024)
2010 self.sockno = self.client.fileno()
2011 self.file = open(support.TESTFN, 'rb')
2012 self.fileno = self.file.fileno()
2013
2014 def tearDown(self):
2015 self.file.close()
2016 self.client.close()
2017 if self.server.running:
2018 self.server.stop()
2019
2020 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2021 """A higher level wrapper representing how an application is
2022 supposed to use sendfile().
2023 """
2024 while 1:
2025 try:
2026 if self.SUPPORT_HEADERS_TRAILERS:
2027 return os.sendfile(sock, file, offset, nbytes, headers,
2028 trailers)
2029 else:
2030 return os.sendfile(sock, file, offset, nbytes)
2031 except OSError as err:
2032 if err.errno == errno.ECONNRESET:
2033 # disconnected
2034 raise
2035 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2036 # we have to retry send data
2037 continue
2038 else:
2039 raise
2040
2041 def test_send_whole_file(self):
2042 # normal send
2043 total_sent = 0
2044 offset = 0
2045 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002046 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002047 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2048 if sent == 0:
2049 break
2050 offset += sent
2051 total_sent += sent
2052 self.assertTrue(sent <= nbytes)
2053 self.assertEqual(offset, total_sent)
2054
2055 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002056 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002057 self.client.close()
2058 self.server.wait()
2059 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002060 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002061 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002062
2063 def test_send_at_certain_offset(self):
2064 # start sending a file at a certain offset
2065 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002066 offset = len(self.DATA) // 2
2067 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002068 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002069 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002070 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2071 if sent == 0:
2072 break
2073 offset += sent
2074 total_sent += sent
2075 self.assertTrue(sent <= nbytes)
2076
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002077 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002078 self.client.close()
2079 self.server.wait()
2080 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002081 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002082 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002083 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002084 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002085
2086 def test_offset_overflow(self):
2087 # specify an offset > file size
2088 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002089 try:
2090 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2091 except OSError as e:
2092 # Solaris can raise EINVAL if offset >= file length, ignore.
2093 if e.errno != errno.EINVAL:
2094 raise
2095 else:
2096 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002097 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002098 self.client.close()
2099 self.server.wait()
2100 data = self.server.handler_instance.get_data()
2101 self.assertEqual(data, b'')
2102
2103 def test_invalid_offset(self):
2104 with self.assertRaises(OSError) as cm:
2105 os.sendfile(self.sockno, self.fileno, -1, 4096)
2106 self.assertEqual(cm.exception.errno, errno.EINVAL)
2107
2108 # --- headers / trailers tests
2109
Serhiy Storchaka43767632013-11-03 21:31:38 +02002110 @requires_headers_trailers
2111 def test_headers(self):
2112 total_sent = 0
2113 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2114 headers=[b"x" * 512])
2115 total_sent += sent
2116 offset = 4096
2117 nbytes = 4096
2118 while 1:
2119 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2120 offset, nbytes)
2121 if sent == 0:
2122 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002123 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002124 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002125
Serhiy Storchaka43767632013-11-03 21:31:38 +02002126 expected_data = b"x" * 512 + self.DATA
2127 self.assertEqual(total_sent, len(expected_data))
2128 self.client.close()
2129 self.server.wait()
2130 data = self.server.handler_instance.get_data()
2131 self.assertEqual(hash(data), hash(expected_data))
2132
2133 @requires_headers_trailers
2134 def test_trailers(self):
2135 TESTFN2 = support.TESTFN + "2"
2136 file_data = b"abcdef"
2137 with open(TESTFN2, 'wb') as f:
2138 f.write(file_data)
2139 with open(TESTFN2, 'rb')as f:
2140 self.addCleanup(os.remove, TESTFN2)
2141 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2142 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002143 self.client.close()
2144 self.server.wait()
2145 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002146 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002147
Serhiy Storchaka43767632013-11-03 21:31:38 +02002148 @requires_headers_trailers
2149 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2150 'test needs os.SF_NODISKIO')
2151 def test_flags(self):
2152 try:
2153 os.sendfile(self.sockno, self.fileno, 0, 4096,
2154 flags=os.SF_NODISKIO)
2155 except OSError as err:
2156 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2157 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002158
2159
Larry Hastings9cf065c2012-06-22 16:30:09 -07002160def supports_extended_attributes():
2161 if not hasattr(os, "setxattr"):
2162 return False
2163 try:
2164 with open(support.TESTFN, "wb") as fp:
2165 try:
2166 os.setxattr(fp.fileno(), b"user.test", b"")
2167 except OSError:
2168 return False
2169 finally:
2170 support.unlink(support.TESTFN)
2171 # Kernels < 2.6.39 don't respect setxattr flags.
2172 kernel_version = platform.release()
2173 m = re.match("2.6.(\d{1,2})", kernel_version)
2174 return m is None or int(m.group(1)) >= 39
2175
2176
2177@unittest.skipUnless(supports_extended_attributes(),
2178 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002179class ExtendedAttributeTests(unittest.TestCase):
2180
2181 def tearDown(self):
2182 support.unlink(support.TESTFN)
2183
Larry Hastings9cf065c2012-06-22 16:30:09 -07002184 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002185 fn = support.TESTFN
2186 open(fn, "wb").close()
2187 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002188 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002189 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002190 init_xattr = listxattr(fn)
2191 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002192 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002193 xattr = set(init_xattr)
2194 xattr.add("user.test")
2195 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002196 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2197 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2198 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002199 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002200 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002201 self.assertEqual(cm.exception.errno, errno.EEXIST)
2202 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002203 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002204 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002205 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002206 xattr.add("user.test2")
2207 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002208 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002209 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002210 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002211 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002212 xattr.remove("user.test")
2213 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002214 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2215 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2216 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2217 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002218 many = sorted("user.test{}".format(i) for i in range(100))
2219 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002220 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002221 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002222
Larry Hastings9cf065c2012-06-22 16:30:09 -07002223 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002224 def make_bytes(s):
2225 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002226 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002227 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002228 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002229
2230 def test_simple(self):
2231 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2232 os.listxattr)
2233
2234 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002235 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2236 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002237
2238 def test_fds(self):
2239 def getxattr(path, *args):
2240 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002241 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002242 def setxattr(path, *args):
2243 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002244 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002245 def removexattr(path, *args):
2246 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002247 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002248 def listxattr(path, *args):
2249 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002250 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002251 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2252
2253
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002254@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2255class Win32DeprecatedBytesAPI(unittest.TestCase):
2256 def test_deprecated(self):
2257 import nt
2258 filename = os.fsencode(support.TESTFN)
2259 with warnings.catch_warnings():
2260 warnings.simplefilter("error", DeprecationWarning)
2261 for func, *args in (
2262 (nt._getfullpathname, filename),
2263 (nt._isdir, filename),
2264 (os.access, filename, os.R_OK),
2265 (os.chdir, filename),
2266 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002267 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002268 (os.link, filename, filename),
2269 (os.listdir, filename),
2270 (os.lstat, filename),
2271 (os.mkdir, filename),
2272 (os.open, filename, os.O_RDONLY),
2273 (os.rename, filename, filename),
2274 (os.rmdir, filename),
2275 (os.startfile, filename),
2276 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002277 (os.unlink, filename),
2278 (os.utime, filename),
2279 ):
2280 self.assertRaises(DeprecationWarning, func, *args)
2281
Victor Stinner28216442011-11-16 00:34:44 +01002282 @support.skip_unless_symlink
2283 def test_symlink(self):
2284 filename = os.fsencode(support.TESTFN)
2285 with warnings.catch_warnings():
2286 warnings.simplefilter("error", DeprecationWarning)
2287 self.assertRaises(DeprecationWarning,
2288 os.symlink, filename, filename)
2289
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002290
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002291@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2292class TermsizeTests(unittest.TestCase):
2293 def test_does_not_crash(self):
2294 """Check if get_terminal_size() returns a meaningful value.
2295
2296 There's no easy portable way to actually check the size of the
2297 terminal, so let's check if it returns something sensible instead.
2298 """
2299 try:
2300 size = os.get_terminal_size()
2301 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002302 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002303 # Under win32 a generic OSError can be thrown if the
2304 # handle cannot be retrieved
2305 self.skipTest("failed to query terminal size")
2306 raise
2307
Antoine Pitroucfade362012-02-08 23:48:59 +01002308 self.assertGreaterEqual(size.columns, 0)
2309 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002310
2311 def test_stty_match(self):
2312 """Check if stty returns the same results
2313
2314 stty actually tests stdin, so get_terminal_size is invoked on
2315 stdin explicitly. If stty succeeded, then get_terminal_size()
2316 should work too.
2317 """
2318 try:
2319 size = subprocess.check_output(['stty', 'size']).decode().split()
2320 except (FileNotFoundError, subprocess.CalledProcessError):
2321 self.skipTest("stty invocation failed")
2322 expected = (int(size[1]), int(size[0])) # reversed order
2323
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002324 try:
2325 actual = os.get_terminal_size(sys.__stdin__.fileno())
2326 except OSError as e:
2327 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2328 # Under win32 a generic OSError can be thrown if the
2329 # handle cannot be retrieved
2330 self.skipTest("failed to query terminal size")
2331 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002332 self.assertEqual(expected, actual)
2333
2334
Victor Stinner292c8352012-10-30 02:17:38 +01002335class OSErrorTests(unittest.TestCase):
2336 def setUp(self):
2337 class Str(str):
2338 pass
2339
Victor Stinnerafe17062012-10-31 22:47:43 +01002340 self.bytes_filenames = []
2341 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002342 if support.TESTFN_UNENCODABLE is not None:
2343 decoded = support.TESTFN_UNENCODABLE
2344 else:
2345 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002346 self.unicode_filenames.append(decoded)
2347 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002348 if support.TESTFN_UNDECODABLE is not None:
2349 encoded = support.TESTFN_UNDECODABLE
2350 else:
2351 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002352 self.bytes_filenames.append(encoded)
2353 self.bytes_filenames.append(memoryview(encoded))
2354
2355 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002356
2357 def test_oserror_filename(self):
2358 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002359 (self.filenames, os.chdir,),
2360 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002361 (self.filenames, os.lstat,),
2362 (self.filenames, os.open, os.O_RDONLY),
2363 (self.filenames, os.rmdir,),
2364 (self.filenames, os.stat,),
2365 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002366 ]
2367 if sys.platform == "win32":
2368 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002369 (self.bytes_filenames, os.rename, b"dst"),
2370 (self.bytes_filenames, os.replace, b"dst"),
2371 (self.unicode_filenames, os.rename, "dst"),
2372 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002373 # Issue #16414: Don't test undecodable names with listdir()
2374 # because of a Windows bug.
2375 #
2376 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2377 # empty list (instead of failing), whereas os.listdir(b'\xff')
2378 # raises a FileNotFoundError. It looks like a Windows bug:
2379 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2380 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2381 # ERROR_PATH_NOT_FOUND (3).
2382 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002383 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002384 else:
2385 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002386 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002387 (self.filenames, os.rename, "dst"),
2388 (self.filenames, os.replace, "dst"),
2389 ))
2390 if hasattr(os, "chown"):
2391 funcs.append((self.filenames, os.chown, 0, 0))
2392 if hasattr(os, "lchown"):
2393 funcs.append((self.filenames, os.lchown, 0, 0))
2394 if hasattr(os, "truncate"):
2395 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002396 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002397 funcs.append((self.filenames, os.chflags, 0))
2398 if hasattr(os, "lchflags"):
2399 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002400 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002401 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002402 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002403 if sys.platform == "win32":
2404 funcs.append((self.bytes_filenames, os.link, b"dst"))
2405 funcs.append((self.unicode_filenames, os.link, "dst"))
2406 else:
2407 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002408 if hasattr(os, "listxattr"):
2409 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002410 (self.filenames, os.listxattr,),
2411 (self.filenames, os.getxattr, "user.test"),
2412 (self.filenames, os.setxattr, "user.test", b'user'),
2413 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002414 ))
2415 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002416 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002417 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002418 if sys.platform == "win32":
2419 funcs.append((self.unicode_filenames, os.readlink,))
2420 else:
2421 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002422
Victor Stinnerafe17062012-10-31 22:47:43 +01002423 for filenames, func, *func_args in funcs:
2424 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002425 try:
2426 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002427 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002428 self.assertIs(err.filename, name)
2429 else:
2430 self.fail("No exception thrown by {}".format(func))
2431
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002432class CPUCountTests(unittest.TestCase):
2433 def test_cpu_count(self):
2434 cpus = os.cpu_count()
2435 if cpus is not None:
2436 self.assertIsInstance(cpus, int)
2437 self.assertGreater(cpus, 0)
2438 else:
2439 self.skipTest("Could not determine the number of CPUs")
2440
Victor Stinnerdaf45552013-08-28 00:53:59 +02002441
2442class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002443 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002444 fd = os.open(__file__, os.O_RDONLY)
2445 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002446 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002447
Victor Stinnerdaf45552013-08-28 00:53:59 +02002448 os.set_inheritable(fd, True)
2449 self.assertEqual(os.get_inheritable(fd), True)
2450
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002451 @unittest.skipIf(fcntl is None, "need fcntl")
2452 def test_get_inheritable_cloexec(self):
2453 fd = os.open(__file__, os.O_RDONLY)
2454 self.addCleanup(os.close, fd)
2455 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002456
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002457 # clear FD_CLOEXEC flag
2458 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2459 flags &= ~fcntl.FD_CLOEXEC
2460 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002461
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002462 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002463
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002464 @unittest.skipIf(fcntl is None, "need fcntl")
2465 def test_set_inheritable_cloexec(self):
2466 fd = os.open(__file__, os.O_RDONLY)
2467 self.addCleanup(os.close, fd)
2468 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2469 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002470
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002471 os.set_inheritable(fd, True)
2472 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2473 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002474
Victor Stinnerdaf45552013-08-28 00:53:59 +02002475 def test_open(self):
2476 fd = os.open(__file__, os.O_RDONLY)
2477 self.addCleanup(os.close, fd)
2478 self.assertEqual(os.get_inheritable(fd), False)
2479
2480 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2481 def test_pipe(self):
2482 rfd, wfd = os.pipe()
2483 self.addCleanup(os.close, rfd)
2484 self.addCleanup(os.close, wfd)
2485 self.assertEqual(os.get_inheritable(rfd), False)
2486 self.assertEqual(os.get_inheritable(wfd), False)
2487
2488 def test_dup(self):
2489 fd1 = os.open(__file__, os.O_RDONLY)
2490 self.addCleanup(os.close, fd1)
2491
2492 fd2 = os.dup(fd1)
2493 self.addCleanup(os.close, fd2)
2494 self.assertEqual(os.get_inheritable(fd2), False)
2495
2496 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2497 def test_dup2(self):
2498 fd = os.open(__file__, os.O_RDONLY)
2499 self.addCleanup(os.close, fd)
2500
2501 # inheritable by default
2502 fd2 = os.open(__file__, os.O_RDONLY)
2503 try:
2504 os.dup2(fd, fd2)
2505 self.assertEqual(os.get_inheritable(fd2), True)
2506 finally:
2507 os.close(fd2)
2508
2509 # force non-inheritable
2510 fd3 = os.open(__file__, os.O_RDONLY)
2511 try:
2512 os.dup2(fd, fd3, inheritable=False)
2513 self.assertEqual(os.get_inheritable(fd3), False)
2514 finally:
2515 os.close(fd3)
2516
2517 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2518 def test_openpty(self):
2519 master_fd, slave_fd = os.openpty()
2520 self.addCleanup(os.close, master_fd)
2521 self.addCleanup(os.close, slave_fd)
2522 self.assertEqual(os.get_inheritable(master_fd), False)
2523 self.assertEqual(os.get_inheritable(slave_fd), False)
2524
2525
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002526@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002527def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002528 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002529 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002530 StatAttributeTests,
2531 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002532 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002533 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002534 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002535 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002536 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002537 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002538 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002539 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002540 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002541 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002542 Win32KillTests,
Tim Golden781bbeb2013-10-25 20:24:06 +01002543 Win32ListdirTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002544 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002545 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002546 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002547 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002548 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002549 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002550 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002551 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002552 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002553 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002554 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002555 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002556 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002557 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002558 CPUCountTests,
Victor Stinnerdaf45552013-08-28 00:53:59 +02002559 FDInheritanceTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002560 )
Fred Drake2e2be372001-09-20 21:33:42 +00002561
2562if __name__ == "__main__":
2563 test_main()