blob: e129beff5476519b1700827ca9021575a901f18e [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070027import decimal
28import fractions
Christian Heimes25827622013-10-12 01:27:08 +020029import pickle
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020042
Georg Brandl2daf6ae2012-02-20 19:54:16 +010043from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000044
Victor Stinner034d0aa2012-06-05 01:22:15 +020045with warnings.catch_warnings():
46 warnings.simplefilter("ignore", DeprecationWarning)
47 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010048st = os.stat(__file__)
49stat_supports_subsecond = (
50 # check if float and int timestamps are different
51 (st.st_atime != st[7])
52 or (st.st_mtime != st[8])
53 or (st.st_ctime != st[9]))
54
Mark Dickinson7cf03892010-04-16 13:45:35 +000055# Detect whether we're on a Linux system that uses the (now outdated
56# and unmaintained) linuxthreads threading library. There's an issue
57# when combining linuxthreads with a failed execv call: see
58# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020059if hasattr(sys, 'thread_info') and sys.thread_info.version:
60 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
61else:
62 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000063
Stefan Krahebee49a2013-01-17 15:31:00 +010064# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
65HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
66
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067# Tests creating TESTFN
68class FileTests(unittest.TestCase):
69 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000070 if os.path.exists(support.TESTFN):
71 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000072 tearDown = setUp
73
74 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000075 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000076 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000077 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000078
Christian Heimesfdab48e2008-01-20 09:06:41 +000079 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000080 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
81 # We must allocate two consecutive file descriptors, otherwise
82 # it will mess up other file descriptors (perhaps even the three
83 # standard ones).
84 second = os.dup(first)
85 try:
86 retries = 0
87 while second != first + 1:
88 os.close(first)
89 retries += 1
90 if retries > 10:
91 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000092 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000093 first, second = second, os.dup(second)
94 finally:
95 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000096 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000097 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000098 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000099
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000100 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000101 def test_rename(self):
102 path = support.TESTFN
103 old = sys.getrefcount(path)
104 self.assertRaises(TypeError, os.rename, path, 0)
105 new = sys.getrefcount(path)
106 self.assertEqual(old, new)
107
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000108 def test_read(self):
109 with open(support.TESTFN, "w+b") as fobj:
110 fobj.write(b"spam")
111 fobj.flush()
112 fd = fobj.fileno()
113 os.lseek(fd, 0, 0)
114 s = os.read(fd, 4)
115 self.assertEqual(type(s), bytes)
116 self.assertEqual(s, b"spam")
117
118 def test_write(self):
119 # os.write() accepts bytes- and buffer-like objects but not strings
120 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
121 self.assertRaises(TypeError, os.write, fd, "beans")
122 os.write(fd, b"bacon\n")
123 os.write(fd, bytearray(b"eggs\n"))
124 os.write(fd, memoryview(b"spam\n"))
125 os.close(fd)
126 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000127 self.assertEqual(fobj.read().splitlines(),
128 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000129
Victor Stinnere0daff12011-03-20 23:36:35 +0100130 def write_windows_console(self, *args):
131 retcode = subprocess.call(args,
132 # use a new console to not flood the test output
133 creationflags=subprocess.CREATE_NEW_CONSOLE,
134 # use a shell to hide the console window (SW_HIDE)
135 shell=True)
136 self.assertEqual(retcode, 0)
137
138 @unittest.skipUnless(sys.platform == 'win32',
139 'test specific to the Windows console')
140 def test_write_windows_console(self):
141 # Issue #11395: the Windows console returns an error (12: not enough
142 # space error) on writing into stdout if stdout mode is binary and the
143 # length is greater than 66,000 bytes (or less, depending on heap
144 # usage).
145 code = "print('x' * 100000)"
146 self.write_windows_console(sys.executable, "-c", code)
147 self.write_windows_console(sys.executable, "-u", "-c", code)
148
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000149 def fdopen_helper(self, *args):
150 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200151 f = os.fdopen(fd, *args)
152 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000153
154 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200155 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
156 os.close(fd)
157
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000158 self.fdopen_helper()
159 self.fdopen_helper('r')
160 self.fdopen_helper('r', 100)
161
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100162 def test_replace(self):
163 TESTFN2 = support.TESTFN + ".2"
164 with open(support.TESTFN, 'w') as f:
165 f.write("1")
166 with open(TESTFN2, 'w') as f:
167 f.write("2")
168 self.addCleanup(os.unlink, TESTFN2)
169 os.replace(support.TESTFN, TESTFN2)
170 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
171 with open(TESTFN2, 'r') as f:
172 self.assertEqual(f.read(), "1")
173
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200174
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000175# Test attributes on return values from os.*stat* family.
176class StatAttributeTests(unittest.TestCase):
177 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000178 os.mkdir(support.TESTFN)
179 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000181 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000182 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000183
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000184 def tearDown(self):
185 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000186 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000187
Serhiy Storchaka43767632013-11-03 21:31:38 +0200188 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000189 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000190 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000191
192 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000193 self.assertEqual(result[stat.ST_SIZE], 3)
194 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000195
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000196 # Make sure all the attributes are there
197 members = dir(result)
198 for name in dir(stat):
199 if name[:3] == 'ST_':
200 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000201 if name.endswith("TIME"):
202 def trunc(x): return int(x)
203 else:
204 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000205 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000206 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000207 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000208
Larry Hastings6fe20b32012-04-19 15:07:49 -0700209 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700210 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700211 for name in 'st_atime st_mtime st_ctime'.split():
212 floaty = int(getattr(result, name) * 100000)
213 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700214 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700215
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000216 try:
217 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200218 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000219 except IndexError:
220 pass
221
222 # Make sure that assignment fails
223 try:
224 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200225 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000226 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000227 pass
228
229 try:
230 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200231 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000232 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233 pass
234
235 try:
236 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200237 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000238 except AttributeError:
239 pass
240
241 # Use the stat_result constructor with a too-short tuple.
242 try:
243 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200244 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000245 except TypeError:
246 pass
247
Ezio Melotti42da6632011-03-15 05:18:48 +0200248 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 try:
250 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
251 except TypeError:
252 pass
253
Antoine Pitrou38425292010-09-21 18:19:07 +0000254 def test_stat_attributes(self):
255 self.check_stat_attributes(self.fname)
256
257 def test_stat_attributes_bytes(self):
258 try:
259 fname = self.fname.encode(sys.getfilesystemencoding())
260 except UnicodeEncodeError:
261 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100262 with warnings.catch_warnings():
263 warnings.simplefilter("ignore", DeprecationWarning)
264 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000265
Christian Heimes25827622013-10-12 01:27:08 +0200266 def test_stat_result_pickle(self):
267 result = os.stat(self.fname)
268 p = pickle.dumps(result)
269 self.assertIn(b'\x03cos\nstat_result\n', p)
270 unpickled = pickle.loads(p)
271 self.assertEqual(result, unpickled)
272
Serhiy Storchaka43767632013-11-03 21:31:38 +0200273 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000274 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000275 try:
276 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000277 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000278 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000279 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200280 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000281
282 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000283 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000285 # Make sure all the attributes are there.
286 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
287 'ffree', 'favail', 'flag', 'namemax')
288 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000289 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000290
291 # Make sure that assignment really fails
292 try:
293 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200294 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000295 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000296 pass
297
298 try:
299 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200300 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000301 except AttributeError:
302 pass
303
304 # Use the constructor with a too-short tuple.
305 try:
306 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200307 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000308 except TypeError:
309 pass
310
Ezio Melotti42da6632011-03-15 05:18:48 +0200311 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000312 try:
313 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
314 except TypeError:
315 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000316
Christian Heimes25827622013-10-12 01:27:08 +0200317 @unittest.skipUnless(hasattr(os, 'statvfs'),
318 "need os.statvfs()")
319 def test_statvfs_result_pickle(self):
320 try:
321 result = os.statvfs(self.fname)
322 except OSError as e:
323 # On AtheOS, glibc always returns ENOSYS
324 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200325 self.skipTest('os.statvfs() failed with ENOSYS')
326
Christian Heimes25827622013-10-12 01:27:08 +0200327 p = pickle.dumps(result)
328 self.assertIn(b'\x03cos\nstatvfs_result\n', p)
329 unpickled = pickle.loads(p)
330 self.assertEqual(result, unpickled)
331
Thomas Wouters89f507f2006-12-13 04:49:30 +0000332 def test_utime_dir(self):
333 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000334 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000335 # round to int, because some systems may support sub-second
336 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000337 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
338 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000339 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000340
Larry Hastings76ad59b2012-05-03 00:30:07 -0700341 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600342 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600343 # second argument. Check that the previous methods of passing
344 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700345 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600346 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700347 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
348 # Setting the time to the time you just read, then reading again,
349 # should always return exactly the same times.
350 st1 = os.stat(filename)
351 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
352 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600353 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700354 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600355 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700356 # Set to the current time in the new way
357 os.utime(filename)
358 st3 = os.stat(filename)
359 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
360
361 def test_utime(self):
362 def utime(file, times):
363 return os.utime(file, times)
364 self._test_utime(self.fname, getattr, utime, 10)
365 self._test_utime(support.TESTFN, getattr, utime, 10)
366
367
368 def _test_utime_ns(self, set_times_ns, test_dir=True):
369 def getattr_ns(o, attr):
370 return getattr(o, attr + "_ns")
371 ten_s = 10 * 1000 * 1000 * 1000
372 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
373 if test_dir:
374 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
375
376 def test_utime_ns(self):
377 def utime_ns(file, times):
378 return os.utime(file, ns=times)
379 self._test_utime_ns(utime_ns)
380
Larry Hastings9cf065c2012-06-22 16:30:09 -0700381 requires_utime_dir_fd = unittest.skipUnless(
382 os.utime in os.supports_dir_fd,
383 "dir_fd support for utime required for this test.")
384 requires_utime_fd = unittest.skipUnless(
385 os.utime in os.supports_fd,
386 "fd support for utime required for this test.")
387 requires_utime_nofollow_symlinks = unittest.skipUnless(
388 os.utime in os.supports_follow_symlinks,
389 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700390
Larry Hastings9cf065c2012-06-22 16:30:09 -0700391 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700392 def test_lutimes_ns(self):
393 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700394 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700395 self._test_utime_ns(lutimes_ns)
396
Larry Hastings9cf065c2012-06-22 16:30:09 -0700397 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700398 def test_futimes_ns(self):
399 def futimes_ns(file, times):
400 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700401 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 self._test_utime_ns(futimes_ns, test_dir=False)
403
404 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700405 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 getattr(os, name)(arg, (5, 5), ns=(5, 5))
407
408 def test_utime_invalid_arguments(self):
409 self._utime_invalid_arguments('utime', self.fname)
410
Brian Curtin52fbea12011-11-06 13:41:17 -0600411
Victor Stinner1aa54a42012-02-08 04:09:37 +0100412 @unittest.skipUnless(stat_supports_subsecond,
413 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100414 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100415 asec, amsec = 1, 901
416 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100417 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100418 mtime = msec + mmsec * 1e-3
419 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100420 os.utime(filename, (0, 0))
421 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200422 with warnings.catch_warnings():
423 warnings.simplefilter("ignore", DeprecationWarning)
424 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100425 st = os.stat(filename)
426 self.assertAlmostEqual(st.st_atime, atime, places=3)
427 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100428
Victor Stinnera2f7c002012-02-08 03:36:25 +0100429 def test_utime_subsecond(self):
430 def set_time(filename, atime, mtime):
431 os.utime(filename, (atime, mtime))
432 self._test_utime_subsecond(set_time)
433
Larry Hastings9cf065c2012-06-22 16:30:09 -0700434 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100435 def test_futimes_subsecond(self):
436 def set_time(filename, atime, mtime):
437 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700438 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100439 self._test_utime_subsecond(set_time)
440
Larry Hastings9cf065c2012-06-22 16:30:09 -0700441 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100442 def test_futimens_subsecond(self):
443 def set_time(filename, atime, mtime):
444 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700445 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100446 self._test_utime_subsecond(set_time)
447
Larry Hastings9cf065c2012-06-22 16:30:09 -0700448 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100449 def test_futimesat_subsecond(self):
450 def set_time(filename, atime, mtime):
451 dirname = os.path.dirname(filename)
452 dirfd = os.open(dirname, os.O_RDONLY)
453 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700454 os.utime(os.path.basename(filename), dir_fd=dirfd,
455 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100456 finally:
457 os.close(dirfd)
458 self._test_utime_subsecond(set_time)
459
Larry Hastings9cf065c2012-06-22 16:30:09 -0700460 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100461 def test_lutimes_subsecond(self):
462 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700463 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100464 self._test_utime_subsecond(set_time)
465
Larry Hastings9cf065c2012-06-22 16:30:09 -0700466 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100467 def test_utimensat_subsecond(self):
468 def set_time(filename, atime, mtime):
469 dirname = os.path.dirname(filename)
470 dirfd = os.open(dirname, os.O_RDONLY)
471 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700472 os.utime(os.path.basename(filename), dir_fd=dirfd,
473 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100474 finally:
475 os.close(dirfd)
476 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100477
Serhiy Storchaka43767632013-11-03 21:31:38 +0200478 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000479 # systems support centiseconds
Serhiy Storchaka43767632013-11-03 21:31:38 +0200480 def get_file_system(path):
481 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000482 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000483 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000484 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000485 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000486 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000487 return buf.value
488
Serhiy Storchaka43767632013-11-03 21:31:38 +0200489 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
490 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
491 "requires NTFS")
492 def test_1565150(self):
493 t1 = 1159195039.25
494 os.utime(self.fname, (t1, t1))
495 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000496
Serhiy Storchaka43767632013-11-03 21:31:38 +0200497 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
498 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
499 "requires NTFS")
500 def test_large_time(self):
501 t1 = 5000000000 # some day in 2128
502 os.utime(self.fname, (t1, t1))
503 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000504
Serhiy Storchaka43767632013-11-03 21:31:38 +0200505 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
506 def test_1686475(self):
507 # Verify that an open file can be stat'ed
508 try:
509 os.stat(r"c:\pagefile.sys")
510 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600511 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200512 except OSError as e:
513 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000514
Serhiy Storchaka43767632013-11-03 21:31:38 +0200515 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
516 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
517 def test_15261(self):
518 # Verify that stat'ing a closed fd does not cause crash
519 r, w = os.pipe()
520 try:
521 os.stat(r) # should not raise error
522 finally:
523 os.close(r)
524 os.close(w)
525 with self.assertRaises(OSError) as ctx:
526 os.stat(r)
527 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100528
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000529from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000530
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000531class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000532 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000533 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000534
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000535 def setUp(self):
536 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000537 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000538 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000539 for key, value in self._reference().items():
540 os.environ[key] = value
541
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000542 def tearDown(self):
543 os.environ.clear()
544 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000545 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000546 os.environb.clear()
547 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000548
Christian Heimes90333392007-11-01 19:08:42 +0000549 def _reference(self):
550 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
551
552 def _empty_mapping(self):
553 os.environ.clear()
554 return os.environ
555
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000556 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300557 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000558 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000559 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300560 os.environ.update(HELLO="World")
561 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
562 value = popen.read().strip()
563 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000564
Ezio Melottic7e139b2012-09-26 20:01:34 +0300565 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000566 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300567 with os.popen(
568 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
569 it = iter(popen)
570 self.assertEqual(next(it), "line1\n")
571 self.assertEqual(next(it), "line2\n")
572 self.assertEqual(next(it), "line3\n")
573 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000574
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000575 # Verify environ keys and values from the OS are of the
576 # correct str type.
577 def test_keyvalue_types(self):
578 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000579 self.assertEqual(type(key), str)
580 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000581
Christian Heimes90333392007-11-01 19:08:42 +0000582 def test_items(self):
583 for key, value in self._reference().items():
584 self.assertEqual(os.environ.get(key), value)
585
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000586 # Issue 7310
587 def test___repr__(self):
588 """Check that the repr() of os.environ looks like environ({...})."""
589 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000590 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
591 '{!r}: {!r}'.format(key, value)
592 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000593
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000594 def test_get_exec_path(self):
595 defpath_list = os.defpath.split(os.pathsep)
596 test_path = ['/monty', '/python', '', '/flying/circus']
597 test_env = {'PATH': os.pathsep.join(test_path)}
598
599 saved_environ = os.environ
600 try:
601 os.environ = dict(test_env)
602 # Test that defaulting to os.environ works.
603 self.assertSequenceEqual(test_path, os.get_exec_path())
604 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
605 finally:
606 os.environ = saved_environ
607
608 # No PATH environment variable
609 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
610 # Empty PATH environment variable
611 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
612 # Supplied PATH environment variable
613 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
614
Victor Stinnerb745a742010-05-18 17:17:23 +0000615 if os.supports_bytes_environ:
616 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000617 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000618 # ignore BytesWarning warning
619 with warnings.catch_warnings(record=True):
620 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000621 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000622 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000623 pass
624 else:
625 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000626
627 # bytes key and/or value
628 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
629 ['abc'])
630 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
631 ['abc'])
632 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
633 ['abc'])
634
635 @unittest.skipUnless(os.supports_bytes_environ,
636 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000637 def test_environb(self):
638 # os.environ -> os.environb
639 value = 'euro\u20ac'
640 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000641 value_bytes = value.encode(sys.getfilesystemencoding(),
642 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000643 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000644 msg = "U+20AC character is not encodable to %s" % (
645 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000646 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000647 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000648 self.assertEqual(os.environ['unicode'], value)
649 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000650
651 # os.environb -> os.environ
652 value = b'\xff'
653 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000654 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000655 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000656 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000657
Charles-François Natali2966f102011-11-26 11:32:46 +0100658 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
659 # #13415).
660 @support.requires_freebsd_version(7)
661 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100662 def test_unset_error(self):
663 if sys.platform == "win32":
664 # an environment variable is limited to 32,767 characters
665 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100666 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100667 else:
668 # "=" is not allowed in a variable name
669 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100670 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100671
Victor Stinner6d101392013-04-14 16:35:04 +0200672 def test_key_type(self):
673 missing = 'missingkey'
674 self.assertNotIn(missing, os.environ)
675
Victor Stinner839e5ea2013-04-14 16:43:03 +0200676 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200677 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200678 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200679 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200680
Victor Stinner839e5ea2013-04-14 16:43:03 +0200681 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200682 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200683 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200684 self.assertTrue(cm.exception.__suppress_context__)
685
Victor Stinner6d101392013-04-14 16:35:04 +0200686
Tim Petersc4e09402003-04-25 07:11:48 +0000687class WalkTests(unittest.TestCase):
688 """Tests for os.walk()."""
689
Charles-François Natali7372b062012-02-05 15:15:38 +0100690 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000691 import os
692 from os.path import join
693
694 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000695 # TESTFN/
696 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000697 # tmp1
698 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000699 # tmp2
700 # SUB11/ no kids
701 # SUB2/ a file kid and a dirsymlink kid
702 # tmp3
703 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200704 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000705 # TEST2/
706 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000707 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000708 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000709 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000710 sub2_path = join(walk_path, "SUB2")
711 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000712 tmp2_path = join(sub1_path, "tmp2")
713 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000714 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000715 t2_path = join(support.TESTFN, "TEST2")
716 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200717 link_path = join(sub2_path, "link")
718 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000719
720 # Create stuff.
721 os.makedirs(sub11_path)
722 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000723 os.makedirs(t2_path)
724 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000725 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000726 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
727 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000728 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400729 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400730 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200731 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000732 else:
733 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000734
735 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000736 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000737 self.assertEqual(len(all), 4)
738 # We can't know which order SUB1 and SUB2 will appear in.
739 # Not flipped: TESTFN, SUB1, SUB11, SUB2
740 # flipped: TESTFN, SUB2, SUB1, SUB11
741 flipped = all[0][1][0] != "SUB1"
742 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200743 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000744 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000745 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
746 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000747 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000748
749 # Prune the search.
750 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000751 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000752 all.append((root, dirs, files))
753 # Don't descend into SUB1.
754 if 'SUB1' in dirs:
755 # Note that this also mutates the dirs we appended to all!
756 dirs.remove('SUB1')
757 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000758 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200759 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000760 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000761
762 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000763 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000764 self.assertEqual(len(all), 4)
765 # We can't know which order SUB1 and SUB2 will appear in.
766 # Not flipped: SUB11, SUB1, SUB2, TESTFN
767 # flipped: SUB2, SUB11, SUB1, TESTFN
768 flipped = all[3][1][0] != "SUB1"
769 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200770 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000771 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000772 self.assertEqual(all[flipped], (sub11_path, [], []))
773 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000774 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000775
Brian Curtin3b4499c2010-12-28 14:31:47 +0000776 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000777 # Walk, following symlinks.
778 for root, dirs, files in os.walk(walk_path, followlinks=True):
779 if root == link_path:
780 self.assertEqual(dirs, [])
781 self.assertEqual(files, ["tmp4"])
782 break
783 else:
784 self.fail("Didn't follow symlink with followlinks=True")
785
786 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000787 # Tear everything down. This is a decent use for bottom-up on
788 # Windows, which doesn't have a recursive delete command. The
789 # (not so) subtlety is that rmdir will fail unless the dir's
790 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000791 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000792 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000793 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000794 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000795 dirname = os.path.join(root, name)
796 if not os.path.islink(dirname):
797 os.rmdir(dirname)
798 else:
799 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000800 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000801
Charles-François Natali7372b062012-02-05 15:15:38 +0100802
803@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
804class FwalkTests(WalkTests):
805 """Tests for os.fwalk()."""
806
Larry Hastingsc48fe982012-06-25 04:49:05 -0700807 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
808 """
809 compare with walk() results.
810 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700811 walk_kwargs = walk_kwargs.copy()
812 fwalk_kwargs = fwalk_kwargs.copy()
813 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
814 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
815 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700816
Charles-François Natali7372b062012-02-05 15:15:38 +0100817 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700818 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100819 expected[root] = (set(dirs), set(files))
820
Larry Hastingsc48fe982012-06-25 04:49:05 -0700821 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100822 self.assertIn(root, expected)
823 self.assertEqual(expected[root], (set(dirs), set(files)))
824
Larry Hastingsc48fe982012-06-25 04:49:05 -0700825 def test_compare_to_walk(self):
826 kwargs = {'top': support.TESTFN}
827 self._compare_to_walk(kwargs, kwargs)
828
Charles-François Natali7372b062012-02-05 15:15:38 +0100829 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700830 try:
831 fd = os.open(".", os.O_RDONLY)
832 walk_kwargs = {'top': support.TESTFN}
833 fwalk_kwargs = walk_kwargs.copy()
834 fwalk_kwargs['dir_fd'] = fd
835 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
836 finally:
837 os.close(fd)
838
839 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100840 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700841 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
842 args = support.TESTFN, topdown, None
843 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100844 # check that the FD is valid
845 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700846 # redundant check
847 os.stat(rootfd)
848 # check that listdir() returns consistent information
849 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100850
851 def test_fd_leak(self):
852 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
853 # we both check that calling fwalk() a large number of times doesn't
854 # yield EMFILE, and that the minimum allocated FD hasn't changed.
855 minfd = os.dup(1)
856 os.close(minfd)
857 for i in range(256):
858 for x in os.fwalk(support.TESTFN):
859 pass
860 newfd = os.dup(1)
861 self.addCleanup(os.close, newfd)
862 self.assertEqual(newfd, minfd)
863
864 def tearDown(self):
865 # cleanup
866 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
867 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700868 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100869 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700870 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700871 if stat.S_ISDIR(st.st_mode):
872 os.rmdir(name, dir_fd=rootfd)
873 else:
874 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100875 os.rmdir(support.TESTFN)
876
877
Guido van Rossume7ba4952007-06-06 23:52:48 +0000878class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000879 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000880 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000881
882 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000883 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000884 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
885 os.makedirs(path) # Should work
886 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
887 os.makedirs(path)
888
889 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000890 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000891 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
892 os.makedirs(path)
893 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
894 'dir5', 'dir6')
895 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000896
Terry Reedy5a22b652010-12-02 07:05:56 +0000897 def test_exist_ok_existing_directory(self):
898 path = os.path.join(support.TESTFN, 'dir1')
899 mode = 0o777
900 old_mask = os.umask(0o022)
901 os.makedirs(path, mode)
902 self.assertRaises(OSError, os.makedirs, path, mode)
903 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -0400904 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +0000905 os.makedirs(path, mode=mode, exist_ok=True)
906 os.umask(old_mask)
907
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400908 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700909 def test_chown_uid_gid_arguments_must_be_index(self):
910 stat = os.stat(support.TESTFN)
911 uid = stat.st_uid
912 gid = stat.st_gid
913 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
914 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
915 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
916 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
917 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
918
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700919 def test_exist_ok_s_isgid_directory(self):
920 path = os.path.join(support.TESTFN, 'dir1')
921 S_ISGID = stat.S_ISGID
922 mode = 0o777
923 old_mask = os.umask(0o022)
924 try:
925 existing_testfn_mode = stat.S_IMODE(
926 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700927 try:
928 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700929 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700930 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700931 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
932 raise unittest.SkipTest('No support for S_ISGID dir mode.')
933 # The os should apply S_ISGID from the parent dir for us, but
934 # this test need not depend on that behavior. Be explicit.
935 os.makedirs(path, mode | S_ISGID)
936 # http://bugs.python.org/issue14992
937 # Should not fail when the bit is already set.
938 os.makedirs(path, mode, exist_ok=True)
939 # remove the bit.
940 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -0400941 # May work even when the bit is not already set when demanded.
942 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700943 finally:
944 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000945
946 def test_exist_ok_existing_regular_file(self):
947 base = support.TESTFN
948 path = os.path.join(support.TESTFN, 'dir1')
949 f = open(path, 'w')
950 f.write('abc')
951 f.close()
952 self.assertRaises(OSError, os.makedirs, path)
953 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
954 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
955 os.remove(path)
956
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000957 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000958 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000959 'dir4', 'dir5', 'dir6')
960 # If the tests failed, the bottom-most directory ('../dir6')
961 # may not have been created, so we look for the outermost directory
962 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000963 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000964 path = os.path.dirname(path)
965
966 os.removedirs(path)
967
Andrew Svetlov405faed2012-12-25 12:18:09 +0200968
969class RemoveDirsTests(unittest.TestCase):
970 def setUp(self):
971 os.makedirs(support.TESTFN)
972
973 def tearDown(self):
974 support.rmtree(support.TESTFN)
975
976 def test_remove_all(self):
977 dira = os.path.join(support.TESTFN, 'dira')
978 os.mkdir(dira)
979 dirb = os.path.join(dira, 'dirb')
980 os.mkdir(dirb)
981 os.removedirs(dirb)
982 self.assertFalse(os.path.exists(dirb))
983 self.assertFalse(os.path.exists(dira))
984 self.assertFalse(os.path.exists(support.TESTFN))
985
986 def test_remove_partial(self):
987 dira = os.path.join(support.TESTFN, 'dira')
988 os.mkdir(dira)
989 dirb = os.path.join(dira, 'dirb')
990 os.mkdir(dirb)
991 with open(os.path.join(dira, 'file.txt'), 'w') as f:
992 f.write('text')
993 os.removedirs(dirb)
994 self.assertFalse(os.path.exists(dirb))
995 self.assertTrue(os.path.exists(dira))
996 self.assertTrue(os.path.exists(support.TESTFN))
997
998 def test_remove_nothing(self):
999 dira = os.path.join(support.TESTFN, 'dira')
1000 os.mkdir(dira)
1001 dirb = os.path.join(dira, 'dirb')
1002 os.mkdir(dirb)
1003 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1004 f.write('text')
1005 with self.assertRaises(OSError):
1006 os.removedirs(dirb)
1007 self.assertTrue(os.path.exists(dirb))
1008 self.assertTrue(os.path.exists(dira))
1009 self.assertTrue(os.path.exists(support.TESTFN))
1010
1011
Guido van Rossume7ba4952007-06-06 23:52:48 +00001012class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001013 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001014 with open(os.devnull, 'wb') as f:
1015 f.write(b'hello')
1016 f.close()
1017 with open(os.devnull, 'rb') as f:
1018 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001019
Andrew Svetlov405faed2012-12-25 12:18:09 +02001020
Guido van Rossume7ba4952007-06-06 23:52:48 +00001021class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001022 def test_urandom_length(self):
1023 self.assertEqual(len(os.urandom(0)), 0)
1024 self.assertEqual(len(os.urandom(1)), 1)
1025 self.assertEqual(len(os.urandom(10)), 10)
1026 self.assertEqual(len(os.urandom(100)), 100)
1027 self.assertEqual(len(os.urandom(1000)), 1000)
1028
1029 def test_urandom_value(self):
1030 data1 = os.urandom(16)
1031 data2 = os.urandom(16)
1032 self.assertNotEqual(data1, data2)
1033
1034 def get_urandom_subprocess(self, count):
1035 code = '\n'.join((
1036 'import os, sys',
1037 'data = os.urandom(%s)' % count,
1038 'sys.stdout.buffer.write(data)',
1039 'sys.stdout.buffer.flush()'))
1040 out = assert_python_ok('-c', code)
1041 stdout = out[1]
1042 self.assertEqual(len(stdout), 16)
1043 return stdout
1044
1045 def test_urandom_subprocess(self):
1046 data1 = self.get_urandom_subprocess(16)
1047 data2 = self.get_urandom_subprocess(16)
1048 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001049
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001050 @unittest.skipUnless(resource, "test requires the resource module")
1051 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001052 # Check urandom() failing when it is not able to open /dev/random.
1053 # We spawn a new process to make the test more robust (if getrlimit()
1054 # failed to restore the file descriptor limit after this, the whole
1055 # test suite would crash; this actually happened on the OS X Tiger
1056 # buildbot).
1057 code = """if 1:
1058 import errno
1059 import os
1060 import resource
1061
1062 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1063 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1064 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001065 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001066 except OSError as e:
1067 assert e.errno == errno.EMFILE, e.errno
1068 else:
1069 raise AssertionError("OSError not raised")
1070 """
1071 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001072
Antoine Pitroue472aea2014-04-26 14:33:03 +02001073 def test_urandom_fd_closed(self):
1074 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1075 # closed.
1076 code = """if 1:
1077 import os
1078 import sys
1079 os.urandom(4)
1080 os.closerange(3, 256)
1081 sys.stdout.buffer.write(os.urandom(4))
1082 """
1083 rc, out, err = assert_python_ok('-Sc', code)
1084
1085 def test_urandom_fd_reopened(self):
1086 # Issue #21207: urandom() should detect its fd to /dev/urandom
1087 # changed to something else, and reopen it.
1088 with open(support.TESTFN, 'wb') as f:
1089 f.write(b"x" * 256)
1090 self.addCleanup(os.unlink, support.TESTFN)
1091 code = """if 1:
1092 import os
1093 import sys
1094 os.urandom(4)
1095 for fd in range(3, 256):
1096 try:
1097 os.close(fd)
1098 except OSError:
1099 pass
1100 else:
1101 # Found the urandom fd (XXX hopefully)
1102 break
1103 os.closerange(3, 256)
1104 with open({TESTFN!r}, 'rb') as f:
1105 os.dup2(f.fileno(), fd)
1106 sys.stdout.buffer.write(os.urandom(4))
1107 sys.stdout.buffer.write(os.urandom(4))
1108 """.format(TESTFN=support.TESTFN)
1109 rc, out, err = assert_python_ok('-Sc', code)
1110 self.assertEqual(len(out), 8)
1111 self.assertNotEqual(out[0:4], out[4:8])
1112 rc, out2, err2 = assert_python_ok('-Sc', code)
1113 self.assertEqual(len(out2), 8)
1114 self.assertNotEqual(out2, out)
1115
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001116
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001117@contextlib.contextmanager
1118def _execvpe_mockup(defpath=None):
1119 """
1120 Stubs out execv and execve functions when used as context manager.
1121 Records exec calls. The mock execv and execve functions always raise an
1122 exception as they would normally never return.
1123 """
1124 # A list of tuples containing (function name, first arg, args)
1125 # of calls to execv or execve that have been made.
1126 calls = []
1127
1128 def mock_execv(name, *args):
1129 calls.append(('execv', name, args))
1130 raise RuntimeError("execv called")
1131
1132 def mock_execve(name, *args):
1133 calls.append(('execve', name, args))
1134 raise OSError(errno.ENOTDIR, "execve called")
1135
1136 try:
1137 orig_execv = os.execv
1138 orig_execve = os.execve
1139 orig_defpath = os.defpath
1140 os.execv = mock_execv
1141 os.execve = mock_execve
1142 if defpath is not None:
1143 os.defpath = defpath
1144 yield calls
1145 finally:
1146 os.execv = orig_execv
1147 os.execve = orig_execve
1148 os.defpath = orig_defpath
1149
Guido van Rossume7ba4952007-06-06 23:52:48 +00001150class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001151 @unittest.skipIf(USING_LINUXTHREADS,
1152 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001153 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001154 self.assertRaises(OSError, os.execvpe, 'no such app-',
1155 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001156
Thomas Heller6790d602007-08-30 17:15:14 +00001157 def test_execvpe_with_bad_arglist(self):
1158 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1159
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001160 @unittest.skipUnless(hasattr(os, '_execvpe'),
1161 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001162 def _test_internal_execvpe(self, test_type):
1163 program_path = os.sep + 'absolutepath'
1164 if test_type is bytes:
1165 program = b'executable'
1166 fullpath = os.path.join(os.fsencode(program_path), program)
1167 native_fullpath = fullpath
1168 arguments = [b'progname', 'arg1', 'arg2']
1169 else:
1170 program = 'executable'
1171 arguments = ['progname', 'arg1', 'arg2']
1172 fullpath = os.path.join(program_path, program)
1173 if os.name != "nt":
1174 native_fullpath = os.fsencode(fullpath)
1175 else:
1176 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001177 env = {'spam': 'beans'}
1178
Victor Stinnerb745a742010-05-18 17:17:23 +00001179 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001180 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001181 self.assertRaises(RuntimeError,
1182 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001183 self.assertEqual(len(calls), 1)
1184 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1185
Victor Stinnerb745a742010-05-18 17:17:23 +00001186 # test os._execvpe() with a relative path:
1187 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001188 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001189 self.assertRaises(OSError,
1190 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001191 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001192 self.assertSequenceEqual(calls[0],
1193 ('execve', native_fullpath, (arguments, env)))
1194
1195 # test os._execvpe() with a relative path:
1196 # os.get_exec_path() reads the 'PATH' variable
1197 with _execvpe_mockup() as calls:
1198 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001199 if test_type is bytes:
1200 env_path[b'PATH'] = program_path
1201 else:
1202 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001203 self.assertRaises(OSError,
1204 os._execvpe, program, arguments, env=env_path)
1205 self.assertEqual(len(calls), 1)
1206 self.assertSequenceEqual(calls[0],
1207 ('execve', native_fullpath, (arguments, env_path)))
1208
1209 def test_internal_execvpe_str(self):
1210 self._test_internal_execvpe(str)
1211 if os.name != "nt":
1212 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001213
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001214
Serhiy Storchaka43767632013-11-03 21:31:38 +02001215@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001216class Win32ErrorTests(unittest.TestCase):
1217 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001218 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001219
1220 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001221 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001222
1223 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001224 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001225
1226 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001227 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001228 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001229 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001230 finally:
1231 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001232 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001233
1234 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001235 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001236
Thomas Wouters477c8d52006-05-27 19:21:47 +00001237 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001238 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001239
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001240class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001241 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001242 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1243 #singles.append("close")
1244 #We omit close because it doesn'r raise an exception on some platforms
1245 def get_single(f):
1246 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001247 if hasattr(os, f):
1248 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001249 return helper
1250 for f in singles:
1251 locals()["test_"+f] = get_single(f)
1252
Benjamin Peterson7522c742009-01-19 21:00:09 +00001253 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001254 try:
1255 f(support.make_bad_fd(), *args)
1256 except OSError as e:
1257 self.assertEqual(e.errno, errno.EBADF)
1258 else:
1259 self.fail("%r didn't raise a OSError with a bad file descriptor"
1260 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001261
Serhiy Storchaka43767632013-11-03 21:31:38 +02001262 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001263 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001264 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001265
Serhiy Storchaka43767632013-11-03 21:31:38 +02001266 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001267 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001268 fd = support.make_bad_fd()
1269 # Make sure none of the descriptors we are about to close are
1270 # currently valid (issue 6542).
1271 for i in range(10):
1272 try: os.fstat(fd+i)
1273 except OSError:
1274 pass
1275 else:
1276 break
1277 if i < 2:
1278 raise unittest.SkipTest(
1279 "Unable to acquire a range of invalid file descriptors")
1280 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001281
Serhiy Storchaka43767632013-11-03 21:31:38 +02001282 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001283 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001284 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001285
Serhiy Storchaka43767632013-11-03 21:31:38 +02001286 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001287 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001288 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001289
Serhiy Storchaka43767632013-11-03 21:31:38 +02001290 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001291 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001292 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001293
Serhiy Storchaka43767632013-11-03 21:31:38 +02001294 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001295 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001296 self.check(os.pathconf, "PC_NAME_MAX")
1297 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001298
Serhiy Storchaka43767632013-11-03 21:31:38 +02001299 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001300 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001301 self.check(os.truncate, 0)
1302 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001303
Serhiy Storchaka43767632013-11-03 21:31:38 +02001304 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001305 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001306 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001307
Serhiy Storchaka43767632013-11-03 21:31:38 +02001308 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001309 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001310 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001311
Victor Stinner57ddf782014-01-08 15:21:28 +01001312 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1313 def test_readv(self):
1314 buf = bytearray(10)
1315 self.check(os.readv, [buf])
1316
Serhiy Storchaka43767632013-11-03 21:31:38 +02001317 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001318 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001319 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001320
Serhiy Storchaka43767632013-11-03 21:31:38 +02001321 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001322 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001323 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001324
Victor Stinner57ddf782014-01-08 15:21:28 +01001325 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1326 def test_writev(self):
1327 self.check(os.writev, [b'abc'])
1328
Brian Curtin1b9df392010-11-24 20:24:31 +00001329
1330class LinkTests(unittest.TestCase):
1331 def setUp(self):
1332 self.file1 = support.TESTFN
1333 self.file2 = os.path.join(support.TESTFN + "2")
1334
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001335 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001336 for file in (self.file1, self.file2):
1337 if os.path.exists(file):
1338 os.unlink(file)
1339
Brian Curtin1b9df392010-11-24 20:24:31 +00001340 def _test_link(self, file1, file2):
1341 with open(file1, "w") as f1:
1342 f1.write("test")
1343
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001344 with warnings.catch_warnings():
1345 warnings.simplefilter("ignore", DeprecationWarning)
1346 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001347 with open(file1, "r") as f1, open(file2, "r") as f2:
1348 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1349
1350 def test_link(self):
1351 self._test_link(self.file1, self.file2)
1352
1353 def test_link_bytes(self):
1354 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1355 bytes(self.file2, sys.getfilesystemencoding()))
1356
Brian Curtinf498b752010-11-30 15:54:04 +00001357 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001358 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001359 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001360 except UnicodeError:
1361 raise unittest.SkipTest("Unable to encode for this platform.")
1362
Brian Curtinf498b752010-11-30 15:54:04 +00001363 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001364 self.file2 = self.file1 + "2"
1365 self._test_link(self.file1, self.file2)
1366
Serhiy Storchaka43767632013-11-03 21:31:38 +02001367@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1368class PosixUidGidTests(unittest.TestCase):
1369 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1370 def test_setuid(self):
1371 if os.getuid() != 0:
1372 self.assertRaises(OSError, os.setuid, 0)
1373 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001374
Serhiy Storchaka43767632013-11-03 21:31:38 +02001375 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1376 def test_setgid(self):
1377 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1378 self.assertRaises(OSError, os.setgid, 0)
1379 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001380
Serhiy Storchaka43767632013-11-03 21:31:38 +02001381 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1382 def test_seteuid(self):
1383 if os.getuid() != 0:
1384 self.assertRaises(OSError, os.seteuid, 0)
1385 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001386
Serhiy Storchaka43767632013-11-03 21:31:38 +02001387 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1388 def test_setegid(self):
1389 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1390 self.assertRaises(OSError, os.setegid, 0)
1391 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001392
Serhiy Storchaka43767632013-11-03 21:31:38 +02001393 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1394 def test_setreuid(self):
1395 if os.getuid() != 0:
1396 self.assertRaises(OSError, os.setreuid, 0, 0)
1397 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1398 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001399
Serhiy Storchaka43767632013-11-03 21:31:38 +02001400 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1401 def test_setreuid_neg1(self):
1402 # Needs to accept -1. We run this in a subprocess to avoid
1403 # altering the test runner's process state (issue8045).
1404 subprocess.check_call([
1405 sys.executable, '-c',
1406 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001407
Serhiy Storchaka43767632013-11-03 21:31:38 +02001408 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1409 def test_setregid(self):
1410 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1411 self.assertRaises(OSError, os.setregid, 0, 0)
1412 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1413 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001414
Serhiy Storchaka43767632013-11-03 21:31:38 +02001415 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1416 def test_setregid_neg1(self):
1417 # Needs to accept -1. We run this in a subprocess to avoid
1418 # altering the test runner's process state (issue8045).
1419 subprocess.check_call([
1420 sys.executable, '-c',
1421 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001422
Serhiy Storchaka43767632013-11-03 21:31:38 +02001423@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1424class Pep383Tests(unittest.TestCase):
1425 def setUp(self):
1426 if support.TESTFN_UNENCODABLE:
1427 self.dir = support.TESTFN_UNENCODABLE
1428 elif support.TESTFN_NONASCII:
1429 self.dir = support.TESTFN_NONASCII
1430 else:
1431 self.dir = support.TESTFN
1432 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001433
Serhiy Storchaka43767632013-11-03 21:31:38 +02001434 bytesfn = []
1435 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001436 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001437 fn = os.fsencode(fn)
1438 except UnicodeEncodeError:
1439 return
1440 bytesfn.append(fn)
1441 add_filename(support.TESTFN_UNICODE)
1442 if support.TESTFN_UNENCODABLE:
1443 add_filename(support.TESTFN_UNENCODABLE)
1444 if support.TESTFN_NONASCII:
1445 add_filename(support.TESTFN_NONASCII)
1446 if not bytesfn:
1447 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001448
Serhiy Storchaka43767632013-11-03 21:31:38 +02001449 self.unicodefn = set()
1450 os.mkdir(self.dir)
1451 try:
1452 for fn in bytesfn:
1453 support.create_empty_file(os.path.join(self.bdir, fn))
1454 fn = os.fsdecode(fn)
1455 if fn in self.unicodefn:
1456 raise ValueError("duplicate filename")
1457 self.unicodefn.add(fn)
1458 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001459 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001460 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001461
Serhiy Storchaka43767632013-11-03 21:31:38 +02001462 def tearDown(self):
1463 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001464
Serhiy Storchaka43767632013-11-03 21:31:38 +02001465 def test_listdir(self):
1466 expected = self.unicodefn
1467 found = set(os.listdir(self.dir))
1468 self.assertEqual(found, expected)
1469 # test listdir without arguments
1470 current_directory = os.getcwd()
1471 try:
1472 os.chdir(os.sep)
1473 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1474 finally:
1475 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001476
Serhiy Storchaka43767632013-11-03 21:31:38 +02001477 def test_open(self):
1478 for fn in self.unicodefn:
1479 f = open(os.path.join(self.dir, fn), 'rb')
1480 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001481
Serhiy Storchaka43767632013-11-03 21:31:38 +02001482 @unittest.skipUnless(hasattr(os, 'statvfs'),
1483 "need os.statvfs()")
1484 def test_statvfs(self):
1485 # issue #9645
1486 for fn in self.unicodefn:
1487 # should not fail with file not found error
1488 fullname = os.path.join(self.dir, fn)
1489 os.statvfs(fullname)
1490
1491 def test_stat(self):
1492 for fn in self.unicodefn:
1493 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001494
Brian Curtineb24d742010-04-12 17:16:38 +00001495@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1496class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001497 def _kill(self, sig):
1498 # Start sys.executable as a subprocess and communicate from the
1499 # subprocess to the parent that the interpreter is ready. When it
1500 # becomes ready, send *sig* via os.kill to the subprocess and check
1501 # that the return code is equal to *sig*.
1502 import ctypes
1503 from ctypes import wintypes
1504 import msvcrt
1505
1506 # Since we can't access the contents of the process' stdout until the
1507 # process has exited, use PeekNamedPipe to see what's inside stdout
1508 # without waiting. This is done so we can tell that the interpreter
1509 # is started and running at a point where it could handle a signal.
1510 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1511 PeekNamedPipe.restype = wintypes.BOOL
1512 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1513 ctypes.POINTER(ctypes.c_char), # stdout buf
1514 wintypes.DWORD, # Buffer size
1515 ctypes.POINTER(wintypes.DWORD), # bytes read
1516 ctypes.POINTER(wintypes.DWORD), # bytes avail
1517 ctypes.POINTER(wintypes.DWORD)) # bytes left
1518 msg = "running"
1519 proc = subprocess.Popen([sys.executable, "-c",
1520 "import sys;"
1521 "sys.stdout.write('{}');"
1522 "sys.stdout.flush();"
1523 "input()".format(msg)],
1524 stdout=subprocess.PIPE,
1525 stderr=subprocess.PIPE,
1526 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001527 self.addCleanup(proc.stdout.close)
1528 self.addCleanup(proc.stderr.close)
1529 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001530
1531 count, max = 0, 100
1532 while count < max and proc.poll() is None:
1533 # Create a string buffer to store the result of stdout from the pipe
1534 buf = ctypes.create_string_buffer(len(msg))
1535 # Obtain the text currently in proc.stdout
1536 # Bytes read/avail/left are left as NULL and unused
1537 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1538 buf, ctypes.sizeof(buf), None, None, None)
1539 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1540 if buf.value:
1541 self.assertEqual(msg, buf.value.decode())
1542 break
1543 time.sleep(0.1)
1544 count += 1
1545 else:
1546 self.fail("Did not receive communication from the subprocess")
1547
Brian Curtineb24d742010-04-12 17:16:38 +00001548 os.kill(proc.pid, sig)
1549 self.assertEqual(proc.wait(), sig)
1550
1551 def test_kill_sigterm(self):
1552 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001553 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001554
1555 def test_kill_int(self):
1556 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001557 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001558
1559 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001560 tagname = "test_os_%s" % uuid.uuid1()
1561 m = mmap.mmap(-1, 1, tagname)
1562 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001563 # Run a script which has console control handling enabled.
1564 proc = subprocess.Popen([sys.executable,
1565 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001566 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001567 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1568 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001569 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001570 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001571 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001572 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001573 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001574 count += 1
1575 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001576 # Forcefully kill the process if we weren't able to signal it.
1577 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001578 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001579 os.kill(proc.pid, event)
1580 # proc.send_signal(event) could also be done here.
1581 # Allow time for the signal to be passed and the process to exit.
1582 time.sleep(0.5)
1583 if not proc.poll():
1584 # Forcefully kill the process if we weren't able to signal it.
1585 os.kill(proc.pid, signal.SIGINT)
1586 self.fail("subprocess did not stop on {}".format(name))
1587
1588 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1589 def test_CTRL_C_EVENT(self):
1590 from ctypes import wintypes
1591 import ctypes
1592
1593 # Make a NULL value by creating a pointer with no argument.
1594 NULL = ctypes.POINTER(ctypes.c_int)()
1595 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1596 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1597 wintypes.BOOL)
1598 SetConsoleCtrlHandler.restype = wintypes.BOOL
1599
1600 # Calling this with NULL and FALSE causes the calling process to
1601 # handle CTRL+C, rather than ignore it. This property is inherited
1602 # by subprocesses.
1603 SetConsoleCtrlHandler(NULL, 0)
1604
1605 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1606
1607 def test_CTRL_BREAK_EVENT(self):
1608 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1609
1610
Brian Curtind40e6f72010-07-08 21:39:08 +00001611@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001612class Win32ListdirTests(unittest.TestCase):
1613 """Test listdir on Windows."""
1614
1615 def setUp(self):
1616 self.created_paths = []
1617 for i in range(2):
1618 dir_name = 'SUB%d' % i
1619 dir_path = os.path.join(support.TESTFN, dir_name)
1620 file_name = 'FILE%d' % i
1621 file_path = os.path.join(support.TESTFN, file_name)
1622 os.makedirs(dir_path)
1623 with open(file_path, 'w') as f:
1624 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1625 self.created_paths.extend([dir_name, file_name])
1626 self.created_paths.sort()
1627
1628 def tearDown(self):
1629 shutil.rmtree(support.TESTFN)
1630
1631 def test_listdir_no_extended_path(self):
1632 """Test when the path is not an "extended" path."""
1633 # unicode
1634 self.assertEqual(
1635 sorted(os.listdir(support.TESTFN)),
1636 self.created_paths)
1637 # bytes
1638 self.assertEqual(
1639 sorted(os.listdir(os.fsencode(support.TESTFN))),
1640 [os.fsencode(path) for path in self.created_paths])
1641
1642 def test_listdir_extended_path(self):
1643 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001644 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001645 # unicode
1646 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1647 self.assertEqual(
1648 sorted(os.listdir(path)),
1649 self.created_paths)
1650 # bytes
1651 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1652 self.assertEqual(
1653 sorted(os.listdir(path)),
1654 [os.fsencode(path) for path in self.created_paths])
1655
1656
1657@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001658@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001659class Win32SymlinkTests(unittest.TestCase):
1660 filelink = 'filelinktest'
1661 filelink_target = os.path.abspath(__file__)
1662 dirlink = 'dirlinktest'
1663 dirlink_target = os.path.dirname(filelink_target)
1664 missing_link = 'missing link'
1665
1666 def setUp(self):
1667 assert os.path.exists(self.dirlink_target)
1668 assert os.path.exists(self.filelink_target)
1669 assert not os.path.exists(self.dirlink)
1670 assert not os.path.exists(self.filelink)
1671 assert not os.path.exists(self.missing_link)
1672
1673 def tearDown(self):
1674 if os.path.exists(self.filelink):
1675 os.remove(self.filelink)
1676 if os.path.exists(self.dirlink):
1677 os.rmdir(self.dirlink)
1678 if os.path.lexists(self.missing_link):
1679 os.remove(self.missing_link)
1680
1681 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001682 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001683 self.assertTrue(os.path.exists(self.dirlink))
1684 self.assertTrue(os.path.isdir(self.dirlink))
1685 self.assertTrue(os.path.islink(self.dirlink))
1686 self.check_stat(self.dirlink, self.dirlink_target)
1687
1688 def test_file_link(self):
1689 os.symlink(self.filelink_target, self.filelink)
1690 self.assertTrue(os.path.exists(self.filelink))
1691 self.assertTrue(os.path.isfile(self.filelink))
1692 self.assertTrue(os.path.islink(self.filelink))
1693 self.check_stat(self.filelink, self.filelink_target)
1694
1695 def _create_missing_dir_link(self):
1696 'Create a "directory" link to a non-existent target'
1697 linkname = self.missing_link
1698 if os.path.lexists(linkname):
1699 os.remove(linkname)
1700 target = r'c:\\target does not exist.29r3c740'
1701 assert not os.path.exists(target)
1702 target_is_dir = True
1703 os.symlink(target, linkname, target_is_dir)
1704
1705 def test_remove_directory_link_to_missing_target(self):
1706 self._create_missing_dir_link()
1707 # For compatibility with Unix, os.remove will check the
1708 # directory status and call RemoveDirectory if the symlink
1709 # was created with target_is_dir==True.
1710 os.remove(self.missing_link)
1711
1712 @unittest.skip("currently fails; consider for improvement")
1713 def test_isdir_on_directory_link_to_missing_target(self):
1714 self._create_missing_dir_link()
1715 # consider having isdir return true for directory links
1716 self.assertTrue(os.path.isdir(self.missing_link))
1717
1718 @unittest.skip("currently fails; consider for improvement")
1719 def test_rmdir_on_directory_link_to_missing_target(self):
1720 self._create_missing_dir_link()
1721 # consider allowing rmdir to remove directory links
1722 os.rmdir(self.missing_link)
1723
1724 def check_stat(self, link, target):
1725 self.assertEqual(os.stat(link), os.stat(target))
1726 self.assertNotEqual(os.lstat(link), os.stat(link))
1727
Brian Curtind25aef52011-06-13 15:16:04 -05001728 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001729 with warnings.catch_warnings():
1730 warnings.simplefilter("ignore", DeprecationWarning)
1731 self.assertEqual(os.stat(bytes_link), os.stat(target))
1732 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001733
1734 def test_12084(self):
1735 level1 = os.path.abspath(support.TESTFN)
1736 level2 = os.path.join(level1, "level2")
1737 level3 = os.path.join(level2, "level3")
1738 try:
1739 os.mkdir(level1)
1740 os.mkdir(level2)
1741 os.mkdir(level3)
1742
1743 file1 = os.path.abspath(os.path.join(level1, "file1"))
1744
1745 with open(file1, "w") as f:
1746 f.write("file1")
1747
1748 orig_dir = os.getcwd()
1749 try:
1750 os.chdir(level2)
1751 link = os.path.join(level2, "link")
1752 os.symlink(os.path.relpath(file1), "link")
1753 self.assertIn("link", os.listdir(os.getcwd()))
1754
1755 # Check os.stat calls from the same dir as the link
1756 self.assertEqual(os.stat(file1), os.stat("link"))
1757
1758 # Check os.stat calls from a dir below the link
1759 os.chdir(level1)
1760 self.assertEqual(os.stat(file1),
1761 os.stat(os.path.relpath(link)))
1762
1763 # Check os.stat calls from a dir above the link
1764 os.chdir(level3)
1765 self.assertEqual(os.stat(file1),
1766 os.stat(os.path.relpath(link)))
1767 finally:
1768 os.chdir(orig_dir)
1769 except OSError as err:
1770 self.fail(err)
1771 finally:
1772 os.remove(file1)
1773 shutil.rmtree(level1)
1774
Brian Curtind40e6f72010-07-08 21:39:08 +00001775
Jason R. Coombs3a092862013-05-27 23:21:28 -04001776@support.skip_unless_symlink
1777class NonLocalSymlinkTests(unittest.TestCase):
1778
1779 def setUp(self):
1780 """
1781 Create this structure:
1782
1783 base
1784 \___ some_dir
1785 """
1786 os.makedirs('base/some_dir')
1787
1788 def tearDown(self):
1789 shutil.rmtree('base')
1790
1791 def test_directory_link_nonlocal(self):
1792 """
1793 The symlink target should resolve relative to the link, not relative
1794 to the current directory.
1795
1796 Then, link base/some_link -> base/some_dir and ensure that some_link
1797 is resolved as a directory.
1798
1799 In issue13772, it was discovered that directory detection failed if
1800 the symlink target was not specified relative to the current
1801 directory, which was a defect in the implementation.
1802 """
1803 src = os.path.join('base', 'some_link')
1804 os.symlink('some_dir', src)
1805 assert os.path.isdir(src)
1806
1807
Victor Stinnere8d51452010-08-19 01:05:19 +00001808class FSEncodingTests(unittest.TestCase):
1809 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001810 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1811 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001812
Victor Stinnere8d51452010-08-19 01:05:19 +00001813 def test_identity(self):
1814 # assert fsdecode(fsencode(x)) == x
1815 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1816 try:
1817 bytesfn = os.fsencode(fn)
1818 except UnicodeEncodeError:
1819 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001820 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001821
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001822
Brett Cannonefb00c02012-02-29 18:31:31 -05001823
1824class DeviceEncodingTests(unittest.TestCase):
1825
1826 def test_bad_fd(self):
1827 # Return None when an fd doesn't actually exist.
1828 self.assertIsNone(os.device_encoding(123456))
1829
Philip Jenveye308b7c2012-02-29 16:16:15 -08001830 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1831 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001832 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001833 def test_device_encoding(self):
1834 encoding = os.device_encoding(0)
1835 self.assertIsNotNone(encoding)
1836 self.assertTrue(codecs.lookup(encoding))
1837
1838
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001839class PidTests(unittest.TestCase):
1840 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1841 def test_getppid(self):
1842 p = subprocess.Popen([sys.executable, '-c',
1843 'import os; print(os.getppid())'],
1844 stdout=subprocess.PIPE)
1845 stdout, _ = p.communicate()
1846 # We are the parent of our subprocess
1847 self.assertEqual(int(stdout), os.getpid())
1848
1849
Brian Curtin0151b8e2010-09-24 13:43:43 +00001850# The introduction of this TestCase caused at least two different errors on
1851# *nix buildbots. Temporarily skip this to let the buildbots move along.
1852@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001853@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1854class LoginTests(unittest.TestCase):
1855 def test_getlogin(self):
1856 user_name = os.getlogin()
1857 self.assertNotEqual(len(user_name), 0)
1858
1859
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001860@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1861 "needs os.getpriority and os.setpriority")
1862class ProgramPriorityTests(unittest.TestCase):
1863 """Tests for os.getpriority() and os.setpriority()."""
1864
1865 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001866
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001867 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1868 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1869 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001870 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1871 if base >= 19 and new_prio <= 19:
1872 raise unittest.SkipTest(
1873 "unable to reliably test setpriority at current nice level of %s" % base)
1874 else:
1875 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001876 finally:
1877 try:
1878 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1879 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001880 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001881 raise
1882
1883
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001884if threading is not None:
1885 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001886
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001887 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001888
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001889 def __init__(self, conn):
1890 asynchat.async_chat.__init__(self, conn)
1891 self.in_buffer = []
1892 self.closed = False
1893 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001894
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001895 def handle_read(self):
1896 data = self.recv(4096)
1897 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001898
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001899 def get_data(self):
1900 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001901
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001902 def handle_close(self):
1903 self.close()
1904 self.closed = True
1905
1906 def handle_error(self):
1907 raise
1908
1909 def __init__(self, address):
1910 threading.Thread.__init__(self)
1911 asyncore.dispatcher.__init__(self)
1912 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1913 self.bind(address)
1914 self.listen(5)
1915 self.host, self.port = self.socket.getsockname()[:2]
1916 self.handler_instance = None
1917 self._active = False
1918 self._active_lock = threading.Lock()
1919
1920 # --- public API
1921
1922 @property
1923 def running(self):
1924 return self._active
1925
1926 def start(self):
1927 assert not self.running
1928 self.__flag = threading.Event()
1929 threading.Thread.start(self)
1930 self.__flag.wait()
1931
1932 def stop(self):
1933 assert self.running
1934 self._active = False
1935 self.join()
1936
1937 def wait(self):
1938 # wait for handler connection to be closed, then stop the server
1939 while not getattr(self.handler_instance, "closed", False):
1940 time.sleep(0.001)
1941 self.stop()
1942
1943 # --- internals
1944
1945 def run(self):
1946 self._active = True
1947 self.__flag.set()
1948 while self._active and asyncore.socket_map:
1949 self._active_lock.acquire()
1950 asyncore.loop(timeout=0.001, count=1)
1951 self._active_lock.release()
1952 asyncore.close_all()
1953
1954 def handle_accept(self):
1955 conn, addr = self.accept()
1956 self.handler_instance = self.Handler(conn)
1957
1958 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001959 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001960 handle_read = handle_connect
1961
1962 def writable(self):
1963 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001964
1965 def handle_error(self):
1966 raise
1967
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001968
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001969@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001970@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1971class TestSendfile(unittest.TestCase):
1972
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001973 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001974 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001975 not sys.platform.startswith("solaris") and \
1976 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02001977 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
1978 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001979
1980 @classmethod
1981 def setUpClass(cls):
1982 with open(support.TESTFN, "wb") as f:
1983 f.write(cls.DATA)
1984
1985 @classmethod
1986 def tearDownClass(cls):
1987 support.unlink(support.TESTFN)
1988
1989 def setUp(self):
1990 self.server = SendfileTestServer((support.HOST, 0))
1991 self.server.start()
1992 self.client = socket.socket()
1993 self.client.connect((self.server.host, self.server.port))
1994 self.client.settimeout(1)
1995 # synchronize by waiting for "220 ready" response
1996 self.client.recv(1024)
1997 self.sockno = self.client.fileno()
1998 self.file = open(support.TESTFN, 'rb')
1999 self.fileno = self.file.fileno()
2000
2001 def tearDown(self):
2002 self.file.close()
2003 self.client.close()
2004 if self.server.running:
2005 self.server.stop()
2006
2007 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2008 """A higher level wrapper representing how an application is
2009 supposed to use sendfile().
2010 """
2011 while 1:
2012 try:
2013 if self.SUPPORT_HEADERS_TRAILERS:
2014 return os.sendfile(sock, file, offset, nbytes, headers,
2015 trailers)
2016 else:
2017 return os.sendfile(sock, file, offset, nbytes)
2018 except OSError as err:
2019 if err.errno == errno.ECONNRESET:
2020 # disconnected
2021 raise
2022 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2023 # we have to retry send data
2024 continue
2025 else:
2026 raise
2027
2028 def test_send_whole_file(self):
2029 # normal send
2030 total_sent = 0
2031 offset = 0
2032 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002033 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002034 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2035 if sent == 0:
2036 break
2037 offset += sent
2038 total_sent += sent
2039 self.assertTrue(sent <= nbytes)
2040 self.assertEqual(offset, total_sent)
2041
2042 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002043 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002044 self.client.close()
2045 self.server.wait()
2046 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002047 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002048 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002049
2050 def test_send_at_certain_offset(self):
2051 # start sending a file at a certain offset
2052 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002053 offset = len(self.DATA) // 2
2054 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002055 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002056 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002057 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2058 if sent == 0:
2059 break
2060 offset += sent
2061 total_sent += sent
2062 self.assertTrue(sent <= nbytes)
2063
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002064 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002065 self.client.close()
2066 self.server.wait()
2067 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002068 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002069 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002070 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002071 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002072
2073 def test_offset_overflow(self):
2074 # specify an offset > file size
2075 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002076 try:
2077 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2078 except OSError as e:
2079 # Solaris can raise EINVAL if offset >= file length, ignore.
2080 if e.errno != errno.EINVAL:
2081 raise
2082 else:
2083 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002084 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002085 self.client.close()
2086 self.server.wait()
2087 data = self.server.handler_instance.get_data()
2088 self.assertEqual(data, b'')
2089
2090 def test_invalid_offset(self):
2091 with self.assertRaises(OSError) as cm:
2092 os.sendfile(self.sockno, self.fileno, -1, 4096)
2093 self.assertEqual(cm.exception.errno, errno.EINVAL)
2094
2095 # --- headers / trailers tests
2096
Serhiy Storchaka43767632013-11-03 21:31:38 +02002097 @requires_headers_trailers
2098 def test_headers(self):
2099 total_sent = 0
2100 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2101 headers=[b"x" * 512])
2102 total_sent += sent
2103 offset = 4096
2104 nbytes = 4096
2105 while 1:
2106 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2107 offset, nbytes)
2108 if sent == 0:
2109 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002110 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002111 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002112
Serhiy Storchaka43767632013-11-03 21:31:38 +02002113 expected_data = b"x" * 512 + self.DATA
2114 self.assertEqual(total_sent, len(expected_data))
2115 self.client.close()
2116 self.server.wait()
2117 data = self.server.handler_instance.get_data()
2118 self.assertEqual(hash(data), hash(expected_data))
2119
2120 @requires_headers_trailers
2121 def test_trailers(self):
2122 TESTFN2 = support.TESTFN + "2"
2123 file_data = b"abcdef"
2124 with open(TESTFN2, 'wb') as f:
2125 f.write(file_data)
2126 with open(TESTFN2, 'rb')as f:
2127 self.addCleanup(os.remove, TESTFN2)
2128 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2129 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002130 self.client.close()
2131 self.server.wait()
2132 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002133 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002134
Serhiy Storchaka43767632013-11-03 21:31:38 +02002135 @requires_headers_trailers
2136 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2137 'test needs os.SF_NODISKIO')
2138 def test_flags(self):
2139 try:
2140 os.sendfile(self.sockno, self.fileno, 0, 4096,
2141 flags=os.SF_NODISKIO)
2142 except OSError as err:
2143 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2144 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002145
2146
Larry Hastings9cf065c2012-06-22 16:30:09 -07002147def supports_extended_attributes():
2148 if not hasattr(os, "setxattr"):
2149 return False
2150 try:
2151 with open(support.TESTFN, "wb") as fp:
2152 try:
2153 os.setxattr(fp.fileno(), b"user.test", b"")
2154 except OSError:
2155 return False
2156 finally:
2157 support.unlink(support.TESTFN)
2158 # Kernels < 2.6.39 don't respect setxattr flags.
2159 kernel_version = platform.release()
2160 m = re.match("2.6.(\d{1,2})", kernel_version)
2161 return m is None or int(m.group(1)) >= 39
2162
2163
2164@unittest.skipUnless(supports_extended_attributes(),
2165 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002166class ExtendedAttributeTests(unittest.TestCase):
2167
2168 def tearDown(self):
2169 support.unlink(support.TESTFN)
2170
Larry Hastings9cf065c2012-06-22 16:30:09 -07002171 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002172 fn = support.TESTFN
2173 open(fn, "wb").close()
2174 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002175 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002176 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002177 init_xattr = listxattr(fn)
2178 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002179 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002180 xattr = set(init_xattr)
2181 xattr.add("user.test")
2182 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002183 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2184 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2185 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002186 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002187 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002188 self.assertEqual(cm.exception.errno, errno.EEXIST)
2189 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002190 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002191 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002192 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002193 xattr.add("user.test2")
2194 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002195 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002196 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002197 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002198 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002199 xattr.remove("user.test")
2200 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002201 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2202 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2203 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2204 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002205 many = sorted("user.test{}".format(i) for i in range(100))
2206 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002207 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002208 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002209
Larry Hastings9cf065c2012-06-22 16:30:09 -07002210 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002211 def make_bytes(s):
2212 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002213 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002214 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002215 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002216
2217 def test_simple(self):
2218 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2219 os.listxattr)
2220
2221 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002222 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2223 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002224
2225 def test_fds(self):
2226 def getxattr(path, *args):
2227 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002228 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002229 def setxattr(path, *args):
2230 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002231 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002232 def removexattr(path, *args):
2233 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002234 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002235 def listxattr(path, *args):
2236 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002237 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002238 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2239
2240
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002241@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2242class Win32DeprecatedBytesAPI(unittest.TestCase):
2243 def test_deprecated(self):
2244 import nt
2245 filename = os.fsencode(support.TESTFN)
2246 with warnings.catch_warnings():
2247 warnings.simplefilter("error", DeprecationWarning)
2248 for func, *args in (
2249 (nt._getfullpathname, filename),
2250 (nt._isdir, filename),
2251 (os.access, filename, os.R_OK),
2252 (os.chdir, filename),
2253 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002254 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002255 (os.link, filename, filename),
2256 (os.listdir, filename),
2257 (os.lstat, filename),
2258 (os.mkdir, filename),
2259 (os.open, filename, os.O_RDONLY),
2260 (os.rename, filename, filename),
2261 (os.rmdir, filename),
2262 (os.startfile, filename),
2263 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002264 (os.unlink, filename),
2265 (os.utime, filename),
2266 ):
2267 self.assertRaises(DeprecationWarning, func, *args)
2268
Victor Stinner28216442011-11-16 00:34:44 +01002269 @support.skip_unless_symlink
2270 def test_symlink(self):
2271 filename = os.fsencode(support.TESTFN)
2272 with warnings.catch_warnings():
2273 warnings.simplefilter("error", DeprecationWarning)
2274 self.assertRaises(DeprecationWarning,
2275 os.symlink, filename, filename)
2276
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002277
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002278@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2279class TermsizeTests(unittest.TestCase):
2280 def test_does_not_crash(self):
2281 """Check if get_terminal_size() returns a meaningful value.
2282
2283 There's no easy portable way to actually check the size of the
2284 terminal, so let's check if it returns something sensible instead.
2285 """
2286 try:
2287 size = os.get_terminal_size()
2288 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002289 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002290 # Under win32 a generic OSError can be thrown if the
2291 # handle cannot be retrieved
2292 self.skipTest("failed to query terminal size")
2293 raise
2294
Antoine Pitroucfade362012-02-08 23:48:59 +01002295 self.assertGreaterEqual(size.columns, 0)
2296 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002297
2298 def test_stty_match(self):
2299 """Check if stty returns the same results
2300
2301 stty actually tests stdin, so get_terminal_size is invoked on
2302 stdin explicitly. If stty succeeded, then get_terminal_size()
2303 should work too.
2304 """
2305 try:
2306 size = subprocess.check_output(['stty', 'size']).decode().split()
2307 except (FileNotFoundError, subprocess.CalledProcessError):
2308 self.skipTest("stty invocation failed")
2309 expected = (int(size[1]), int(size[0])) # reversed order
2310
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002311 try:
2312 actual = os.get_terminal_size(sys.__stdin__.fileno())
2313 except OSError as e:
2314 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2315 # Under win32 a generic OSError can be thrown if the
2316 # handle cannot be retrieved
2317 self.skipTest("failed to query terminal size")
2318 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002319 self.assertEqual(expected, actual)
2320
2321
Victor Stinner292c8352012-10-30 02:17:38 +01002322class OSErrorTests(unittest.TestCase):
2323 def setUp(self):
2324 class Str(str):
2325 pass
2326
Victor Stinnerafe17062012-10-31 22:47:43 +01002327 self.bytes_filenames = []
2328 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002329 if support.TESTFN_UNENCODABLE is not None:
2330 decoded = support.TESTFN_UNENCODABLE
2331 else:
2332 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002333 self.unicode_filenames.append(decoded)
2334 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002335 if support.TESTFN_UNDECODABLE is not None:
2336 encoded = support.TESTFN_UNDECODABLE
2337 else:
2338 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002339 self.bytes_filenames.append(encoded)
2340 self.bytes_filenames.append(memoryview(encoded))
2341
2342 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002343
2344 def test_oserror_filename(self):
2345 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002346 (self.filenames, os.chdir,),
2347 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002348 (self.filenames, os.lstat,),
2349 (self.filenames, os.open, os.O_RDONLY),
2350 (self.filenames, os.rmdir,),
2351 (self.filenames, os.stat,),
2352 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002353 ]
2354 if sys.platform == "win32":
2355 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002356 (self.bytes_filenames, os.rename, b"dst"),
2357 (self.bytes_filenames, os.replace, b"dst"),
2358 (self.unicode_filenames, os.rename, "dst"),
2359 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002360 # Issue #16414: Don't test undecodable names with listdir()
2361 # because of a Windows bug.
2362 #
2363 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2364 # empty list (instead of failing), whereas os.listdir(b'\xff')
2365 # raises a FileNotFoundError. It looks like a Windows bug:
2366 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2367 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2368 # ERROR_PATH_NOT_FOUND (3).
2369 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002370 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002371 else:
2372 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002373 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002374 (self.filenames, os.rename, "dst"),
2375 (self.filenames, os.replace, "dst"),
2376 ))
2377 if hasattr(os, "chown"):
2378 funcs.append((self.filenames, os.chown, 0, 0))
2379 if hasattr(os, "lchown"):
2380 funcs.append((self.filenames, os.lchown, 0, 0))
2381 if hasattr(os, "truncate"):
2382 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002383 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002384 funcs.append((self.filenames, os.chflags, 0))
2385 if hasattr(os, "lchflags"):
2386 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002387 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002388 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002389 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002390 if sys.platform == "win32":
2391 funcs.append((self.bytes_filenames, os.link, b"dst"))
2392 funcs.append((self.unicode_filenames, os.link, "dst"))
2393 else:
2394 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002395 if hasattr(os, "listxattr"):
2396 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002397 (self.filenames, os.listxattr,),
2398 (self.filenames, os.getxattr, "user.test"),
2399 (self.filenames, os.setxattr, "user.test", b'user'),
2400 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002401 ))
2402 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002403 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002404 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002405 if sys.platform == "win32":
2406 funcs.append((self.unicode_filenames, os.readlink,))
2407 else:
2408 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002409
Victor Stinnerafe17062012-10-31 22:47:43 +01002410 for filenames, func, *func_args in funcs:
2411 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002412 try:
2413 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002414 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002415 self.assertIs(err.filename, name)
2416 else:
2417 self.fail("No exception thrown by {}".format(func))
2418
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002419class CPUCountTests(unittest.TestCase):
2420 def test_cpu_count(self):
2421 cpus = os.cpu_count()
2422 if cpus is not None:
2423 self.assertIsInstance(cpus, int)
2424 self.assertGreater(cpus, 0)
2425 else:
2426 self.skipTest("Could not determine the number of CPUs")
2427
Victor Stinnerdaf45552013-08-28 00:53:59 +02002428
2429class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002430 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002431 fd = os.open(__file__, os.O_RDONLY)
2432 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002433 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002434
Victor Stinnerdaf45552013-08-28 00:53:59 +02002435 os.set_inheritable(fd, True)
2436 self.assertEqual(os.get_inheritable(fd), True)
2437
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002438 @unittest.skipIf(fcntl is None, "need fcntl")
2439 def test_get_inheritable_cloexec(self):
2440 fd = os.open(__file__, os.O_RDONLY)
2441 self.addCleanup(os.close, fd)
2442 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002443
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002444 # clear FD_CLOEXEC flag
2445 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2446 flags &= ~fcntl.FD_CLOEXEC
2447 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002448
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002449 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002450
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002451 @unittest.skipIf(fcntl is None, "need fcntl")
2452 def test_set_inheritable_cloexec(self):
2453 fd = os.open(__file__, os.O_RDONLY)
2454 self.addCleanup(os.close, fd)
2455 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2456 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002457
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002458 os.set_inheritable(fd, True)
2459 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2460 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002461
Victor Stinnerdaf45552013-08-28 00:53:59 +02002462 def test_open(self):
2463 fd = os.open(__file__, os.O_RDONLY)
2464 self.addCleanup(os.close, fd)
2465 self.assertEqual(os.get_inheritable(fd), False)
2466
2467 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2468 def test_pipe(self):
2469 rfd, wfd = os.pipe()
2470 self.addCleanup(os.close, rfd)
2471 self.addCleanup(os.close, wfd)
2472 self.assertEqual(os.get_inheritable(rfd), False)
2473 self.assertEqual(os.get_inheritable(wfd), False)
2474
2475 def test_dup(self):
2476 fd1 = os.open(__file__, os.O_RDONLY)
2477 self.addCleanup(os.close, fd1)
2478
2479 fd2 = os.dup(fd1)
2480 self.addCleanup(os.close, fd2)
2481 self.assertEqual(os.get_inheritable(fd2), False)
2482
2483 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2484 def test_dup2(self):
2485 fd = os.open(__file__, os.O_RDONLY)
2486 self.addCleanup(os.close, fd)
2487
2488 # inheritable by default
2489 fd2 = os.open(__file__, os.O_RDONLY)
2490 try:
2491 os.dup2(fd, fd2)
2492 self.assertEqual(os.get_inheritable(fd2), True)
2493 finally:
2494 os.close(fd2)
2495
2496 # force non-inheritable
2497 fd3 = os.open(__file__, os.O_RDONLY)
2498 try:
2499 os.dup2(fd, fd3, inheritable=False)
2500 self.assertEqual(os.get_inheritable(fd3), False)
2501 finally:
2502 os.close(fd3)
2503
2504 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2505 def test_openpty(self):
2506 master_fd, slave_fd = os.openpty()
2507 self.addCleanup(os.close, master_fd)
2508 self.addCleanup(os.close, slave_fd)
2509 self.assertEqual(os.get_inheritable(master_fd), False)
2510 self.assertEqual(os.get_inheritable(slave_fd), False)
2511
2512
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002513@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002514def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002515 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002516 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002517 StatAttributeTests,
2518 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002519 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002520 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002521 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002522 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002523 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002524 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002525 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002526 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002527 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002528 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002529 Win32KillTests,
Tim Golden781bbeb2013-10-25 20:24:06 +01002530 Win32ListdirTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002531 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002532 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002533 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002534 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002535 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002536 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002537 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002538 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002539 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002540 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002541 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002542 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002543 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002544 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002545 CPUCountTests,
Victor Stinnerdaf45552013-08-28 00:53:59 +02002546 FDInheritanceTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002547 )
Fred Drake2e2be372001-09-20 21:33:42 +00002548
2549if __name__ == "__main__":
2550 test_main()