blob: 50240939d8adc970491ca9bd279a6cc0ecc9d060 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070027import decimal
28import fractions
Christian Heimes25827622013-10-12 01:27:08 +020029import pickle
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020042
Georg Brandl2daf6ae2012-02-20 19:54:16 +010043from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000044
Victor Stinner034d0aa2012-06-05 01:22:15 +020045with warnings.catch_warnings():
46 warnings.simplefilter("ignore", DeprecationWarning)
47 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010048st = os.stat(__file__)
49stat_supports_subsecond = (
50 # check if float and int timestamps are different
51 (st.st_atime != st[7])
52 or (st.st_mtime != st[8])
53 or (st.st_ctime != st[9]))
54
Mark Dickinson7cf03892010-04-16 13:45:35 +000055# Detect whether we're on a Linux system that uses the (now outdated
56# and unmaintained) linuxthreads threading library. There's an issue
57# when combining linuxthreads with a failed execv call: see
58# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020059if hasattr(sys, 'thread_info') and sys.thread_info.version:
60 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
61else:
62 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000063
Stefan Krahebee49a2013-01-17 15:31:00 +010064# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
65HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
66
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067# Tests creating TESTFN
68class FileTests(unittest.TestCase):
69 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000070 if os.path.exists(support.TESTFN):
71 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000072 tearDown = setUp
73
74 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000075 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000076 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000077 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000078
Christian Heimesfdab48e2008-01-20 09:06:41 +000079 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000080 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
81 # We must allocate two consecutive file descriptors, otherwise
82 # it will mess up other file descriptors (perhaps even the three
83 # standard ones).
84 second = os.dup(first)
85 try:
86 retries = 0
87 while second != first + 1:
88 os.close(first)
89 retries += 1
90 if retries > 10:
91 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000092 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000093 first, second = second, os.dup(second)
94 finally:
95 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000096 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000097 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000098 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000099
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000100 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000101 def test_rename(self):
102 path = support.TESTFN
103 old = sys.getrefcount(path)
104 self.assertRaises(TypeError, os.rename, path, 0)
105 new = sys.getrefcount(path)
106 self.assertEqual(old, new)
107
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000108 def test_read(self):
109 with open(support.TESTFN, "w+b") as fobj:
110 fobj.write(b"spam")
111 fobj.flush()
112 fd = fobj.fileno()
113 os.lseek(fd, 0, 0)
114 s = os.read(fd, 4)
115 self.assertEqual(type(s), bytes)
116 self.assertEqual(s, b"spam")
117
118 def test_write(self):
119 # os.write() accepts bytes- and buffer-like objects but not strings
120 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
121 self.assertRaises(TypeError, os.write, fd, "beans")
122 os.write(fd, b"bacon\n")
123 os.write(fd, bytearray(b"eggs\n"))
124 os.write(fd, memoryview(b"spam\n"))
125 os.close(fd)
126 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000127 self.assertEqual(fobj.read().splitlines(),
128 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000129
Victor Stinnere0daff12011-03-20 23:36:35 +0100130 def write_windows_console(self, *args):
131 retcode = subprocess.call(args,
132 # use a new console to not flood the test output
133 creationflags=subprocess.CREATE_NEW_CONSOLE,
134 # use a shell to hide the console window (SW_HIDE)
135 shell=True)
136 self.assertEqual(retcode, 0)
137
138 @unittest.skipUnless(sys.platform == 'win32',
139 'test specific to the Windows console')
140 def test_write_windows_console(self):
141 # Issue #11395: the Windows console returns an error (12: not enough
142 # space error) on writing into stdout if stdout mode is binary and the
143 # length is greater than 66,000 bytes (or less, depending on heap
144 # usage).
145 code = "print('x' * 100000)"
146 self.write_windows_console(sys.executable, "-c", code)
147 self.write_windows_console(sys.executable, "-u", "-c", code)
148
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000149 def fdopen_helper(self, *args):
150 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200151 f = os.fdopen(fd, *args)
152 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000153
154 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200155 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
156 os.close(fd)
157
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000158 self.fdopen_helper()
159 self.fdopen_helper('r')
160 self.fdopen_helper('r', 100)
161
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100162 def test_replace(self):
163 TESTFN2 = support.TESTFN + ".2"
164 with open(support.TESTFN, 'w') as f:
165 f.write("1")
166 with open(TESTFN2, 'w') as f:
167 f.write("2")
168 self.addCleanup(os.unlink, TESTFN2)
169 os.replace(support.TESTFN, TESTFN2)
170 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
171 with open(TESTFN2, 'r') as f:
172 self.assertEqual(f.read(), "1")
173
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200174
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000175# Test attributes on return values from os.*stat* family.
176class StatAttributeTests(unittest.TestCase):
177 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000178 os.mkdir(support.TESTFN)
179 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000181 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000182 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000183
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000184 def tearDown(self):
185 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000186 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000187
Serhiy Storchaka43767632013-11-03 21:31:38 +0200188 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000189 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000190 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000191
192 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000193 self.assertEqual(result[stat.ST_SIZE], 3)
194 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000195
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000196 # Make sure all the attributes are there
197 members = dir(result)
198 for name in dir(stat):
199 if name[:3] == 'ST_':
200 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000201 if name.endswith("TIME"):
202 def trunc(x): return int(x)
203 else:
204 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000205 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000206 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000207 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000208
Larry Hastings6fe20b32012-04-19 15:07:49 -0700209 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700210 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700211 for name in 'st_atime st_mtime st_ctime'.split():
212 floaty = int(getattr(result, name) * 100000)
213 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700214 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700215
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000216 try:
217 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200218 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000219 except IndexError:
220 pass
221
222 # Make sure that assignment fails
223 try:
224 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200225 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000226 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000227 pass
228
229 try:
230 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200231 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000232 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233 pass
234
235 try:
236 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200237 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000238 except AttributeError:
239 pass
240
241 # Use the stat_result constructor with a too-short tuple.
242 try:
243 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200244 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000245 except TypeError:
246 pass
247
Ezio Melotti42da6632011-03-15 05:18:48 +0200248 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 try:
250 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
251 except TypeError:
252 pass
253
Antoine Pitrou38425292010-09-21 18:19:07 +0000254 def test_stat_attributes(self):
255 self.check_stat_attributes(self.fname)
256
257 def test_stat_attributes_bytes(self):
258 try:
259 fname = self.fname.encode(sys.getfilesystemencoding())
260 except UnicodeEncodeError:
261 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100262 with warnings.catch_warnings():
263 warnings.simplefilter("ignore", DeprecationWarning)
264 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000265
Christian Heimes25827622013-10-12 01:27:08 +0200266 def test_stat_result_pickle(self):
267 result = os.stat(self.fname)
268 p = pickle.dumps(result)
269 self.assertIn(b'\x03cos\nstat_result\n', p)
270 unpickled = pickle.loads(p)
271 self.assertEqual(result, unpickled)
272
Serhiy Storchaka43767632013-11-03 21:31:38 +0200273 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000274 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000275 try:
276 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000277 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000278 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000279 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200280 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000281
282 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000283 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000285 # Make sure all the attributes are there.
286 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
287 'ffree', 'favail', 'flag', 'namemax')
288 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000289 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000290
291 # Make sure that assignment really fails
292 try:
293 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200294 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000295 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000296 pass
297
298 try:
299 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200300 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000301 except AttributeError:
302 pass
303
304 # Use the constructor with a too-short tuple.
305 try:
306 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200307 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000308 except TypeError:
309 pass
310
Ezio Melotti42da6632011-03-15 05:18:48 +0200311 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000312 try:
313 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
314 except TypeError:
315 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000316
Christian Heimes25827622013-10-12 01:27:08 +0200317 @unittest.skipUnless(hasattr(os, 'statvfs'),
318 "need os.statvfs()")
319 def test_statvfs_result_pickle(self):
320 try:
321 result = os.statvfs(self.fname)
322 except OSError as e:
323 # On AtheOS, glibc always returns ENOSYS
324 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200325 self.skipTest('os.statvfs() failed with ENOSYS')
326
Christian Heimes25827622013-10-12 01:27:08 +0200327 p = pickle.dumps(result)
328 self.assertIn(b'\x03cos\nstatvfs_result\n', p)
329 unpickled = pickle.loads(p)
330 self.assertEqual(result, unpickled)
331
Thomas Wouters89f507f2006-12-13 04:49:30 +0000332 def test_utime_dir(self):
333 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000334 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000335 # round to int, because some systems may support sub-second
336 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000337 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
338 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000339 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000340
Larry Hastings76ad59b2012-05-03 00:30:07 -0700341 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600342 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600343 # second argument. Check that the previous methods of passing
344 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700345 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600346 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700347 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
348 # Setting the time to the time you just read, then reading again,
349 # should always return exactly the same times.
350 st1 = os.stat(filename)
351 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
352 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600353 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700354 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600355 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700356 # Set to the current time in the new way
357 os.utime(filename)
358 st3 = os.stat(filename)
359 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
360
361 def test_utime(self):
362 def utime(file, times):
363 return os.utime(file, times)
364 self._test_utime(self.fname, getattr, utime, 10)
365 self._test_utime(support.TESTFN, getattr, utime, 10)
366
367
368 def _test_utime_ns(self, set_times_ns, test_dir=True):
369 def getattr_ns(o, attr):
370 return getattr(o, attr + "_ns")
371 ten_s = 10 * 1000 * 1000 * 1000
372 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
373 if test_dir:
374 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
375
376 def test_utime_ns(self):
377 def utime_ns(file, times):
378 return os.utime(file, ns=times)
379 self._test_utime_ns(utime_ns)
380
Larry Hastings9cf065c2012-06-22 16:30:09 -0700381 requires_utime_dir_fd = unittest.skipUnless(
382 os.utime in os.supports_dir_fd,
383 "dir_fd support for utime required for this test.")
384 requires_utime_fd = unittest.skipUnless(
385 os.utime in os.supports_fd,
386 "fd support for utime required for this test.")
387 requires_utime_nofollow_symlinks = unittest.skipUnless(
388 os.utime in os.supports_follow_symlinks,
389 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700390
Larry Hastings9cf065c2012-06-22 16:30:09 -0700391 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700392 def test_lutimes_ns(self):
393 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700394 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700395 self._test_utime_ns(lutimes_ns)
396
Larry Hastings9cf065c2012-06-22 16:30:09 -0700397 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700398 def test_futimes_ns(self):
399 def futimes_ns(file, times):
400 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700401 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 self._test_utime_ns(futimes_ns, test_dir=False)
403
404 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700405 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 getattr(os, name)(arg, (5, 5), ns=(5, 5))
407
408 def test_utime_invalid_arguments(self):
409 self._utime_invalid_arguments('utime', self.fname)
410
Brian Curtin52fbea12011-11-06 13:41:17 -0600411
Victor Stinner1aa54a42012-02-08 04:09:37 +0100412 @unittest.skipUnless(stat_supports_subsecond,
413 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100414 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100415 asec, amsec = 1, 901
416 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100417 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100418 mtime = msec + mmsec * 1e-3
419 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100420 os.utime(filename, (0, 0))
421 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200422 with warnings.catch_warnings():
423 warnings.simplefilter("ignore", DeprecationWarning)
424 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100425 st = os.stat(filename)
426 self.assertAlmostEqual(st.st_atime, atime, places=3)
427 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100428
Victor Stinnera2f7c002012-02-08 03:36:25 +0100429 def test_utime_subsecond(self):
430 def set_time(filename, atime, mtime):
431 os.utime(filename, (atime, mtime))
432 self._test_utime_subsecond(set_time)
433
Larry Hastings9cf065c2012-06-22 16:30:09 -0700434 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100435 def test_futimes_subsecond(self):
436 def set_time(filename, atime, mtime):
437 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700438 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100439 self._test_utime_subsecond(set_time)
440
Larry Hastings9cf065c2012-06-22 16:30:09 -0700441 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100442 def test_futimens_subsecond(self):
443 def set_time(filename, atime, mtime):
444 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700445 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100446 self._test_utime_subsecond(set_time)
447
Larry Hastings9cf065c2012-06-22 16:30:09 -0700448 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100449 def test_futimesat_subsecond(self):
450 def set_time(filename, atime, mtime):
451 dirname = os.path.dirname(filename)
452 dirfd = os.open(dirname, os.O_RDONLY)
453 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700454 os.utime(os.path.basename(filename), dir_fd=dirfd,
455 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100456 finally:
457 os.close(dirfd)
458 self._test_utime_subsecond(set_time)
459
Larry Hastings9cf065c2012-06-22 16:30:09 -0700460 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100461 def test_lutimes_subsecond(self):
462 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700463 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100464 self._test_utime_subsecond(set_time)
465
Larry Hastings9cf065c2012-06-22 16:30:09 -0700466 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100467 def test_utimensat_subsecond(self):
468 def set_time(filename, atime, mtime):
469 dirname = os.path.dirname(filename)
470 dirfd = os.open(dirname, os.O_RDONLY)
471 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700472 os.utime(os.path.basename(filename), dir_fd=dirfd,
473 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100474 finally:
475 os.close(dirfd)
476 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100477
Serhiy Storchaka43767632013-11-03 21:31:38 +0200478 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000479 # systems support centiseconds
Serhiy Storchaka43767632013-11-03 21:31:38 +0200480 def get_file_system(path):
481 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000482 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000483 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000484 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000485 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000486 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000487 return buf.value
488
Serhiy Storchaka43767632013-11-03 21:31:38 +0200489 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
490 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
491 "requires NTFS")
492 def test_1565150(self):
493 t1 = 1159195039.25
494 os.utime(self.fname, (t1, t1))
495 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000496
Serhiy Storchaka43767632013-11-03 21:31:38 +0200497 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
498 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
499 "requires NTFS")
500 def test_large_time(self):
501 t1 = 5000000000 # some day in 2128
502 os.utime(self.fname, (t1, t1))
503 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000504
Serhiy Storchaka43767632013-11-03 21:31:38 +0200505 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
506 def test_1686475(self):
507 # Verify that an open file can be stat'ed
508 try:
509 os.stat(r"c:\pagefile.sys")
510 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600511 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200512 except OSError as e:
513 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000514
Serhiy Storchaka43767632013-11-03 21:31:38 +0200515 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
516 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
517 def test_15261(self):
518 # Verify that stat'ing a closed fd does not cause crash
519 r, w = os.pipe()
520 try:
521 os.stat(r) # should not raise error
522 finally:
523 os.close(r)
524 os.close(w)
525 with self.assertRaises(OSError) as ctx:
526 os.stat(r)
527 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100528
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000529from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000530
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000531class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000532 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000533 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000534
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000535 def setUp(self):
536 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000537 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000538 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000539 for key, value in self._reference().items():
540 os.environ[key] = value
541
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000542 def tearDown(self):
543 os.environ.clear()
544 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000545 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000546 os.environb.clear()
547 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000548
Christian Heimes90333392007-11-01 19:08:42 +0000549 def _reference(self):
550 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
551
552 def _empty_mapping(self):
553 os.environ.clear()
554 return os.environ
555
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000556 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300557 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000558 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000559 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300560 os.environ.update(HELLO="World")
561 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
562 value = popen.read().strip()
563 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000564
Ezio Melottic7e139b2012-09-26 20:01:34 +0300565 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000566 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300567 with os.popen(
568 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
569 it = iter(popen)
570 self.assertEqual(next(it), "line1\n")
571 self.assertEqual(next(it), "line2\n")
572 self.assertEqual(next(it), "line3\n")
573 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000574
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000575 # Verify environ keys and values from the OS are of the
576 # correct str type.
577 def test_keyvalue_types(self):
578 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000579 self.assertEqual(type(key), str)
580 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000581
Christian Heimes90333392007-11-01 19:08:42 +0000582 def test_items(self):
583 for key, value in self._reference().items():
584 self.assertEqual(os.environ.get(key), value)
585
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000586 # Issue 7310
587 def test___repr__(self):
588 """Check that the repr() of os.environ looks like environ({...})."""
589 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000590 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
591 '{!r}: {!r}'.format(key, value)
592 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000593
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000594 def test_get_exec_path(self):
595 defpath_list = os.defpath.split(os.pathsep)
596 test_path = ['/monty', '/python', '', '/flying/circus']
597 test_env = {'PATH': os.pathsep.join(test_path)}
598
599 saved_environ = os.environ
600 try:
601 os.environ = dict(test_env)
602 # Test that defaulting to os.environ works.
603 self.assertSequenceEqual(test_path, os.get_exec_path())
604 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
605 finally:
606 os.environ = saved_environ
607
608 # No PATH environment variable
609 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
610 # Empty PATH environment variable
611 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
612 # Supplied PATH environment variable
613 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
614
Victor Stinnerb745a742010-05-18 17:17:23 +0000615 if os.supports_bytes_environ:
616 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000617 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000618 # ignore BytesWarning warning
619 with warnings.catch_warnings(record=True):
620 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000621 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000622 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000623 pass
624 else:
625 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000626
627 # bytes key and/or value
628 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
629 ['abc'])
630 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
631 ['abc'])
632 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
633 ['abc'])
634
635 @unittest.skipUnless(os.supports_bytes_environ,
636 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000637 def test_environb(self):
638 # os.environ -> os.environb
639 value = 'euro\u20ac'
640 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000641 value_bytes = value.encode(sys.getfilesystemencoding(),
642 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000643 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000644 msg = "U+20AC character is not encodable to %s" % (
645 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000646 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000647 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000648 self.assertEqual(os.environ['unicode'], value)
649 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000650
651 # os.environb -> os.environ
652 value = b'\xff'
653 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000654 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000655 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000656 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000657
Charles-François Natali2966f102011-11-26 11:32:46 +0100658 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
659 # #13415).
660 @support.requires_freebsd_version(7)
661 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100662 def test_unset_error(self):
663 if sys.platform == "win32":
664 # an environment variable is limited to 32,767 characters
665 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100666 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100667 else:
668 # "=" is not allowed in a variable name
669 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100670 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100671
Victor Stinner6d101392013-04-14 16:35:04 +0200672 def test_key_type(self):
673 missing = 'missingkey'
674 self.assertNotIn(missing, os.environ)
675
Victor Stinner839e5ea2013-04-14 16:43:03 +0200676 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200677 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200678 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200679 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200680
Victor Stinner839e5ea2013-04-14 16:43:03 +0200681 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200682 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200683 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200684 self.assertTrue(cm.exception.__suppress_context__)
685
Victor Stinner6d101392013-04-14 16:35:04 +0200686
Tim Petersc4e09402003-04-25 07:11:48 +0000687class WalkTests(unittest.TestCase):
688 """Tests for os.walk()."""
689
Charles-François Natali7372b062012-02-05 15:15:38 +0100690 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000691 import os
692 from os.path import join
693
694 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000695 # TESTFN/
696 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000697 # tmp1
698 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000699 # tmp2
700 # SUB11/ no kids
701 # SUB2/ a file kid and a dirsymlink kid
702 # tmp3
703 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200704 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000705 # TEST2/
706 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000707 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000708 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000709 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000710 sub2_path = join(walk_path, "SUB2")
711 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000712 tmp2_path = join(sub1_path, "tmp2")
713 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000714 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000715 t2_path = join(support.TESTFN, "TEST2")
716 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200717 link_path = join(sub2_path, "link")
718 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000719
720 # Create stuff.
721 os.makedirs(sub11_path)
722 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000723 os.makedirs(t2_path)
724 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000725 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000726 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
727 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000728 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400729 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400730 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200731 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000732 else:
733 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000734
735 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000736 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000737 self.assertEqual(len(all), 4)
738 # We can't know which order SUB1 and SUB2 will appear in.
739 # Not flipped: TESTFN, SUB1, SUB11, SUB2
740 # flipped: TESTFN, SUB2, SUB1, SUB11
741 flipped = all[0][1][0] != "SUB1"
742 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200743 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000744 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000745 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
746 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000747 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000748
749 # Prune the search.
750 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000751 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000752 all.append((root, dirs, files))
753 # Don't descend into SUB1.
754 if 'SUB1' in dirs:
755 # Note that this also mutates the dirs we appended to all!
756 dirs.remove('SUB1')
757 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000758 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200759 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000760 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000761
762 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000763 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000764 self.assertEqual(len(all), 4)
765 # We can't know which order SUB1 and SUB2 will appear in.
766 # Not flipped: SUB11, SUB1, SUB2, TESTFN
767 # flipped: SUB2, SUB11, SUB1, TESTFN
768 flipped = all[3][1][0] != "SUB1"
769 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200770 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000771 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000772 self.assertEqual(all[flipped], (sub11_path, [], []))
773 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000774 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000775
Brian Curtin3b4499c2010-12-28 14:31:47 +0000776 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000777 # Walk, following symlinks.
778 for root, dirs, files in os.walk(walk_path, followlinks=True):
779 if root == link_path:
780 self.assertEqual(dirs, [])
781 self.assertEqual(files, ["tmp4"])
782 break
783 else:
784 self.fail("Didn't follow symlink with followlinks=True")
785
786 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000787 # Tear everything down. This is a decent use for bottom-up on
788 # Windows, which doesn't have a recursive delete command. The
789 # (not so) subtlety is that rmdir will fail unless the dir's
790 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000791 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000792 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000793 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000794 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000795 dirname = os.path.join(root, name)
796 if not os.path.islink(dirname):
797 os.rmdir(dirname)
798 else:
799 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000800 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000801
Charles-François Natali7372b062012-02-05 15:15:38 +0100802
803@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
804class FwalkTests(WalkTests):
805 """Tests for os.fwalk()."""
806
Larry Hastingsc48fe982012-06-25 04:49:05 -0700807 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
808 """
809 compare with walk() results.
810 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700811 walk_kwargs = walk_kwargs.copy()
812 fwalk_kwargs = fwalk_kwargs.copy()
813 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
814 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
815 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700816
Charles-François Natali7372b062012-02-05 15:15:38 +0100817 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700818 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100819 expected[root] = (set(dirs), set(files))
820
Larry Hastingsc48fe982012-06-25 04:49:05 -0700821 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100822 self.assertIn(root, expected)
823 self.assertEqual(expected[root], (set(dirs), set(files)))
824
Larry Hastingsc48fe982012-06-25 04:49:05 -0700825 def test_compare_to_walk(self):
826 kwargs = {'top': support.TESTFN}
827 self._compare_to_walk(kwargs, kwargs)
828
Charles-François Natali7372b062012-02-05 15:15:38 +0100829 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700830 try:
831 fd = os.open(".", os.O_RDONLY)
832 walk_kwargs = {'top': support.TESTFN}
833 fwalk_kwargs = walk_kwargs.copy()
834 fwalk_kwargs['dir_fd'] = fd
835 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
836 finally:
837 os.close(fd)
838
839 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100840 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700841 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
842 args = support.TESTFN, topdown, None
843 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100844 # check that the FD is valid
845 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700846 # redundant check
847 os.stat(rootfd)
848 # check that listdir() returns consistent information
849 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100850
851 def test_fd_leak(self):
852 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
853 # we both check that calling fwalk() a large number of times doesn't
854 # yield EMFILE, and that the minimum allocated FD hasn't changed.
855 minfd = os.dup(1)
856 os.close(minfd)
857 for i in range(256):
858 for x in os.fwalk(support.TESTFN):
859 pass
860 newfd = os.dup(1)
861 self.addCleanup(os.close, newfd)
862 self.assertEqual(newfd, minfd)
863
864 def tearDown(self):
865 # cleanup
866 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
867 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700868 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100869 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700870 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700871 if stat.S_ISDIR(st.st_mode):
872 os.rmdir(name, dir_fd=rootfd)
873 else:
874 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100875 os.rmdir(support.TESTFN)
876
877
Guido van Rossume7ba4952007-06-06 23:52:48 +0000878class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000879 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000880 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000881
882 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000883 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000884 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
885 os.makedirs(path) # Should work
886 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
887 os.makedirs(path)
888
889 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000890 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000891 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
892 os.makedirs(path)
893 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
894 'dir5', 'dir6')
895 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000896
Terry Reedy5a22b652010-12-02 07:05:56 +0000897 def test_exist_ok_existing_directory(self):
898 path = os.path.join(support.TESTFN, 'dir1')
899 mode = 0o777
900 old_mask = os.umask(0o022)
901 os.makedirs(path, mode)
902 self.assertRaises(OSError, os.makedirs, path, mode)
903 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
904 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
905 os.makedirs(path, mode=mode, exist_ok=True)
906 os.umask(old_mask)
907
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400908 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700909 def test_chown_uid_gid_arguments_must_be_index(self):
910 stat = os.stat(support.TESTFN)
911 uid = stat.st_uid
912 gid = stat.st_gid
913 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
914 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
915 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
916 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
917 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
918
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700919 def test_exist_ok_s_isgid_directory(self):
920 path = os.path.join(support.TESTFN, 'dir1')
921 S_ISGID = stat.S_ISGID
922 mode = 0o777
923 old_mask = os.umask(0o022)
924 try:
925 existing_testfn_mode = stat.S_IMODE(
926 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700927 try:
928 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700929 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700930 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700931 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
932 raise unittest.SkipTest('No support for S_ISGID dir mode.')
933 # The os should apply S_ISGID from the parent dir for us, but
934 # this test need not depend on that behavior. Be explicit.
935 os.makedirs(path, mode | S_ISGID)
936 # http://bugs.python.org/issue14992
937 # Should not fail when the bit is already set.
938 os.makedirs(path, mode, exist_ok=True)
939 # remove the bit.
940 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
941 with self.assertRaises(OSError):
942 # Should fail when the bit is not already set when demanded.
943 os.makedirs(path, mode | S_ISGID, exist_ok=True)
944 finally:
945 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000946
947 def test_exist_ok_existing_regular_file(self):
948 base = support.TESTFN
949 path = os.path.join(support.TESTFN, 'dir1')
950 f = open(path, 'w')
951 f.write('abc')
952 f.close()
953 self.assertRaises(OSError, os.makedirs, path)
954 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
955 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
956 os.remove(path)
957
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000958 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000959 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000960 'dir4', 'dir5', 'dir6')
961 # If the tests failed, the bottom-most directory ('../dir6')
962 # may not have been created, so we look for the outermost directory
963 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000964 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000965 path = os.path.dirname(path)
966
967 os.removedirs(path)
968
Andrew Svetlov405faed2012-12-25 12:18:09 +0200969
970class RemoveDirsTests(unittest.TestCase):
971 def setUp(self):
972 os.makedirs(support.TESTFN)
973
974 def tearDown(self):
975 support.rmtree(support.TESTFN)
976
977 def test_remove_all(self):
978 dira = os.path.join(support.TESTFN, 'dira')
979 os.mkdir(dira)
980 dirb = os.path.join(dira, 'dirb')
981 os.mkdir(dirb)
982 os.removedirs(dirb)
983 self.assertFalse(os.path.exists(dirb))
984 self.assertFalse(os.path.exists(dira))
985 self.assertFalse(os.path.exists(support.TESTFN))
986
987 def test_remove_partial(self):
988 dira = os.path.join(support.TESTFN, 'dira')
989 os.mkdir(dira)
990 dirb = os.path.join(dira, 'dirb')
991 os.mkdir(dirb)
992 with open(os.path.join(dira, 'file.txt'), 'w') as f:
993 f.write('text')
994 os.removedirs(dirb)
995 self.assertFalse(os.path.exists(dirb))
996 self.assertTrue(os.path.exists(dira))
997 self.assertTrue(os.path.exists(support.TESTFN))
998
999 def test_remove_nothing(self):
1000 dira = os.path.join(support.TESTFN, 'dira')
1001 os.mkdir(dira)
1002 dirb = os.path.join(dira, 'dirb')
1003 os.mkdir(dirb)
1004 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1005 f.write('text')
1006 with self.assertRaises(OSError):
1007 os.removedirs(dirb)
1008 self.assertTrue(os.path.exists(dirb))
1009 self.assertTrue(os.path.exists(dira))
1010 self.assertTrue(os.path.exists(support.TESTFN))
1011
1012
Guido van Rossume7ba4952007-06-06 23:52:48 +00001013class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001014 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001015 with open(os.devnull, 'wb') as f:
1016 f.write(b'hello')
1017 f.close()
1018 with open(os.devnull, 'rb') as f:
1019 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001020
Andrew Svetlov405faed2012-12-25 12:18:09 +02001021
Guido van Rossume7ba4952007-06-06 23:52:48 +00001022class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001023 def test_urandom_length(self):
1024 self.assertEqual(len(os.urandom(0)), 0)
1025 self.assertEqual(len(os.urandom(1)), 1)
1026 self.assertEqual(len(os.urandom(10)), 10)
1027 self.assertEqual(len(os.urandom(100)), 100)
1028 self.assertEqual(len(os.urandom(1000)), 1000)
1029
1030 def test_urandom_value(self):
1031 data1 = os.urandom(16)
1032 data2 = os.urandom(16)
1033 self.assertNotEqual(data1, data2)
1034
1035 def get_urandom_subprocess(self, count):
1036 code = '\n'.join((
1037 'import os, sys',
1038 'data = os.urandom(%s)' % count,
1039 'sys.stdout.buffer.write(data)',
1040 'sys.stdout.buffer.flush()'))
1041 out = assert_python_ok('-c', code)
1042 stdout = out[1]
1043 self.assertEqual(len(stdout), 16)
1044 return stdout
1045
1046 def test_urandom_subprocess(self):
1047 data1 = self.get_urandom_subprocess(16)
1048 data2 = self.get_urandom_subprocess(16)
1049 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001050
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001051 @unittest.skipUnless(resource, "test requires the resource module")
1052 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001053 # Check urandom() failing when it is not able to open /dev/random.
1054 # We spawn a new process to make the test more robust (if getrlimit()
1055 # failed to restore the file descriptor limit after this, the whole
1056 # test suite would crash; this actually happened on the OS X Tiger
1057 # buildbot).
1058 code = """if 1:
1059 import errno
1060 import os
1061 import resource
1062
1063 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1064 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1065 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001066 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001067 except OSError as e:
1068 assert e.errno == errno.EMFILE, e.errno
1069 else:
1070 raise AssertionError("OSError not raised")
1071 """
1072 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001073
1074
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001075@contextlib.contextmanager
1076def _execvpe_mockup(defpath=None):
1077 """
1078 Stubs out execv and execve functions when used as context manager.
1079 Records exec calls. The mock execv and execve functions always raise an
1080 exception as they would normally never return.
1081 """
1082 # A list of tuples containing (function name, first arg, args)
1083 # of calls to execv or execve that have been made.
1084 calls = []
1085
1086 def mock_execv(name, *args):
1087 calls.append(('execv', name, args))
1088 raise RuntimeError("execv called")
1089
1090 def mock_execve(name, *args):
1091 calls.append(('execve', name, args))
1092 raise OSError(errno.ENOTDIR, "execve called")
1093
1094 try:
1095 orig_execv = os.execv
1096 orig_execve = os.execve
1097 orig_defpath = os.defpath
1098 os.execv = mock_execv
1099 os.execve = mock_execve
1100 if defpath is not None:
1101 os.defpath = defpath
1102 yield calls
1103 finally:
1104 os.execv = orig_execv
1105 os.execve = orig_execve
1106 os.defpath = orig_defpath
1107
Guido van Rossume7ba4952007-06-06 23:52:48 +00001108class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001109 @unittest.skipIf(USING_LINUXTHREADS,
1110 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001111 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001112 self.assertRaises(OSError, os.execvpe, 'no such app-',
1113 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001114
Thomas Heller6790d602007-08-30 17:15:14 +00001115 def test_execvpe_with_bad_arglist(self):
1116 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1117
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001118 @unittest.skipUnless(hasattr(os, '_execvpe'),
1119 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001120 def _test_internal_execvpe(self, test_type):
1121 program_path = os.sep + 'absolutepath'
1122 if test_type is bytes:
1123 program = b'executable'
1124 fullpath = os.path.join(os.fsencode(program_path), program)
1125 native_fullpath = fullpath
1126 arguments = [b'progname', 'arg1', 'arg2']
1127 else:
1128 program = 'executable'
1129 arguments = ['progname', 'arg1', 'arg2']
1130 fullpath = os.path.join(program_path, program)
1131 if os.name != "nt":
1132 native_fullpath = os.fsencode(fullpath)
1133 else:
1134 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001135 env = {'spam': 'beans'}
1136
Victor Stinnerb745a742010-05-18 17:17:23 +00001137 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001138 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001139 self.assertRaises(RuntimeError,
1140 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001141 self.assertEqual(len(calls), 1)
1142 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1143
Victor Stinnerb745a742010-05-18 17:17:23 +00001144 # test os._execvpe() with a relative path:
1145 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001146 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001147 self.assertRaises(OSError,
1148 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001149 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001150 self.assertSequenceEqual(calls[0],
1151 ('execve', native_fullpath, (arguments, env)))
1152
1153 # test os._execvpe() with a relative path:
1154 # os.get_exec_path() reads the 'PATH' variable
1155 with _execvpe_mockup() as calls:
1156 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001157 if test_type is bytes:
1158 env_path[b'PATH'] = program_path
1159 else:
1160 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001161 self.assertRaises(OSError,
1162 os._execvpe, program, arguments, env=env_path)
1163 self.assertEqual(len(calls), 1)
1164 self.assertSequenceEqual(calls[0],
1165 ('execve', native_fullpath, (arguments, env_path)))
1166
1167 def test_internal_execvpe_str(self):
1168 self._test_internal_execvpe(str)
1169 if os.name != "nt":
1170 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001171
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001172
Serhiy Storchaka43767632013-11-03 21:31:38 +02001173@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001174class Win32ErrorTests(unittest.TestCase):
1175 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001176 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001177
1178 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001179 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001180
1181 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001182 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001183
1184 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001185 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001186 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001187 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001188 finally:
1189 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001190 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001191
1192 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001193 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001194
Thomas Wouters477c8d52006-05-27 19:21:47 +00001195 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001196 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001197
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001198class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001199 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001200 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1201 #singles.append("close")
1202 #We omit close because it doesn'r raise an exception on some platforms
1203 def get_single(f):
1204 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001205 if hasattr(os, f):
1206 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001207 return helper
1208 for f in singles:
1209 locals()["test_"+f] = get_single(f)
1210
Benjamin Peterson7522c742009-01-19 21:00:09 +00001211 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001212 try:
1213 f(support.make_bad_fd(), *args)
1214 except OSError as e:
1215 self.assertEqual(e.errno, errno.EBADF)
1216 else:
1217 self.fail("%r didn't raise a OSError with a bad file descriptor"
1218 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001219
Serhiy Storchaka43767632013-11-03 21:31:38 +02001220 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001221 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001222 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001223
Serhiy Storchaka43767632013-11-03 21:31:38 +02001224 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001225 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001226 fd = support.make_bad_fd()
1227 # Make sure none of the descriptors we are about to close are
1228 # currently valid (issue 6542).
1229 for i in range(10):
1230 try: os.fstat(fd+i)
1231 except OSError:
1232 pass
1233 else:
1234 break
1235 if i < 2:
1236 raise unittest.SkipTest(
1237 "Unable to acquire a range of invalid file descriptors")
1238 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001239
Serhiy Storchaka43767632013-11-03 21:31:38 +02001240 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001241 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001242 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001243
Serhiy Storchaka43767632013-11-03 21:31:38 +02001244 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001245 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001246 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001247
Serhiy Storchaka43767632013-11-03 21:31:38 +02001248 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001249 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001250 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001251
Serhiy Storchaka43767632013-11-03 21:31:38 +02001252 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001253 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001254 self.check(os.pathconf, "PC_NAME_MAX")
1255 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001256
Serhiy Storchaka43767632013-11-03 21:31:38 +02001257 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001258 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001259 self.check(os.truncate, 0)
1260 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001261
Serhiy Storchaka43767632013-11-03 21:31:38 +02001262 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001263 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001264 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001265
Serhiy Storchaka43767632013-11-03 21:31:38 +02001266 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001267 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001268 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001269
Victor Stinner57ddf782014-01-08 15:21:28 +01001270 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1271 def test_readv(self):
1272 buf = bytearray(10)
1273 self.check(os.readv, [buf])
1274
Serhiy Storchaka43767632013-11-03 21:31:38 +02001275 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001276 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001277 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001278
Serhiy Storchaka43767632013-11-03 21:31:38 +02001279 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001280 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001281 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001282
Victor Stinner57ddf782014-01-08 15:21:28 +01001283 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1284 def test_writev(self):
1285 self.check(os.writev, [b'abc'])
1286
Brian Curtin1b9df392010-11-24 20:24:31 +00001287
1288class LinkTests(unittest.TestCase):
1289 def setUp(self):
1290 self.file1 = support.TESTFN
1291 self.file2 = os.path.join(support.TESTFN + "2")
1292
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001293 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001294 for file in (self.file1, self.file2):
1295 if os.path.exists(file):
1296 os.unlink(file)
1297
Brian Curtin1b9df392010-11-24 20:24:31 +00001298 def _test_link(self, file1, file2):
1299 with open(file1, "w") as f1:
1300 f1.write("test")
1301
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001302 with warnings.catch_warnings():
1303 warnings.simplefilter("ignore", DeprecationWarning)
1304 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001305 with open(file1, "r") as f1, open(file2, "r") as f2:
1306 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1307
1308 def test_link(self):
1309 self._test_link(self.file1, self.file2)
1310
1311 def test_link_bytes(self):
1312 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1313 bytes(self.file2, sys.getfilesystemencoding()))
1314
Brian Curtinf498b752010-11-30 15:54:04 +00001315 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001316 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001317 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001318 except UnicodeError:
1319 raise unittest.SkipTest("Unable to encode for this platform.")
1320
Brian Curtinf498b752010-11-30 15:54:04 +00001321 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001322 self.file2 = self.file1 + "2"
1323 self._test_link(self.file1, self.file2)
1324
Serhiy Storchaka43767632013-11-03 21:31:38 +02001325@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1326class PosixUidGidTests(unittest.TestCase):
1327 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1328 def test_setuid(self):
1329 if os.getuid() != 0:
1330 self.assertRaises(OSError, os.setuid, 0)
1331 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001332
Serhiy Storchaka43767632013-11-03 21:31:38 +02001333 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1334 def test_setgid(self):
1335 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1336 self.assertRaises(OSError, os.setgid, 0)
1337 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001338
Serhiy Storchaka43767632013-11-03 21:31:38 +02001339 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1340 def test_seteuid(self):
1341 if os.getuid() != 0:
1342 self.assertRaises(OSError, os.seteuid, 0)
1343 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001344
Serhiy Storchaka43767632013-11-03 21:31:38 +02001345 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1346 def test_setegid(self):
1347 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1348 self.assertRaises(OSError, os.setegid, 0)
1349 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001350
Serhiy Storchaka43767632013-11-03 21:31:38 +02001351 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1352 def test_setreuid(self):
1353 if os.getuid() != 0:
1354 self.assertRaises(OSError, os.setreuid, 0, 0)
1355 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1356 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001357
Serhiy Storchaka43767632013-11-03 21:31:38 +02001358 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1359 def test_setreuid_neg1(self):
1360 # Needs to accept -1. We run this in a subprocess to avoid
1361 # altering the test runner's process state (issue8045).
1362 subprocess.check_call([
1363 sys.executable, '-c',
1364 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001365
Serhiy Storchaka43767632013-11-03 21:31:38 +02001366 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1367 def test_setregid(self):
1368 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1369 self.assertRaises(OSError, os.setregid, 0, 0)
1370 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1371 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001372
Serhiy Storchaka43767632013-11-03 21:31:38 +02001373 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1374 def test_setregid_neg1(self):
1375 # Needs to accept -1. We run this in a subprocess to avoid
1376 # altering the test runner's process state (issue8045).
1377 subprocess.check_call([
1378 sys.executable, '-c',
1379 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001380
Serhiy Storchaka43767632013-11-03 21:31:38 +02001381@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1382class Pep383Tests(unittest.TestCase):
1383 def setUp(self):
1384 if support.TESTFN_UNENCODABLE:
1385 self.dir = support.TESTFN_UNENCODABLE
1386 elif support.TESTFN_NONASCII:
1387 self.dir = support.TESTFN_NONASCII
1388 else:
1389 self.dir = support.TESTFN
1390 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001391
Serhiy Storchaka43767632013-11-03 21:31:38 +02001392 bytesfn = []
1393 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001394 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001395 fn = os.fsencode(fn)
1396 except UnicodeEncodeError:
1397 return
1398 bytesfn.append(fn)
1399 add_filename(support.TESTFN_UNICODE)
1400 if support.TESTFN_UNENCODABLE:
1401 add_filename(support.TESTFN_UNENCODABLE)
1402 if support.TESTFN_NONASCII:
1403 add_filename(support.TESTFN_NONASCII)
1404 if not bytesfn:
1405 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001406
Serhiy Storchaka43767632013-11-03 21:31:38 +02001407 self.unicodefn = set()
1408 os.mkdir(self.dir)
1409 try:
1410 for fn in bytesfn:
1411 support.create_empty_file(os.path.join(self.bdir, fn))
1412 fn = os.fsdecode(fn)
1413 if fn in self.unicodefn:
1414 raise ValueError("duplicate filename")
1415 self.unicodefn.add(fn)
1416 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001417 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001418 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001419
Serhiy Storchaka43767632013-11-03 21:31:38 +02001420 def tearDown(self):
1421 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001422
Serhiy Storchaka43767632013-11-03 21:31:38 +02001423 def test_listdir(self):
1424 expected = self.unicodefn
1425 found = set(os.listdir(self.dir))
1426 self.assertEqual(found, expected)
1427 # test listdir without arguments
1428 current_directory = os.getcwd()
1429 try:
1430 os.chdir(os.sep)
1431 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1432 finally:
1433 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001434
Serhiy Storchaka43767632013-11-03 21:31:38 +02001435 def test_open(self):
1436 for fn in self.unicodefn:
1437 f = open(os.path.join(self.dir, fn), 'rb')
1438 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001439
Serhiy Storchaka43767632013-11-03 21:31:38 +02001440 @unittest.skipUnless(hasattr(os, 'statvfs'),
1441 "need os.statvfs()")
1442 def test_statvfs(self):
1443 # issue #9645
1444 for fn in self.unicodefn:
1445 # should not fail with file not found error
1446 fullname = os.path.join(self.dir, fn)
1447 os.statvfs(fullname)
1448
1449 def test_stat(self):
1450 for fn in self.unicodefn:
1451 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001452
Brian Curtineb24d742010-04-12 17:16:38 +00001453@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1454class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001455 def _kill(self, sig):
1456 # Start sys.executable as a subprocess and communicate from the
1457 # subprocess to the parent that the interpreter is ready. When it
1458 # becomes ready, send *sig* via os.kill to the subprocess and check
1459 # that the return code is equal to *sig*.
1460 import ctypes
1461 from ctypes import wintypes
1462 import msvcrt
1463
1464 # Since we can't access the contents of the process' stdout until the
1465 # process has exited, use PeekNamedPipe to see what's inside stdout
1466 # without waiting. This is done so we can tell that the interpreter
1467 # is started and running at a point where it could handle a signal.
1468 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1469 PeekNamedPipe.restype = wintypes.BOOL
1470 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1471 ctypes.POINTER(ctypes.c_char), # stdout buf
1472 wintypes.DWORD, # Buffer size
1473 ctypes.POINTER(wintypes.DWORD), # bytes read
1474 ctypes.POINTER(wintypes.DWORD), # bytes avail
1475 ctypes.POINTER(wintypes.DWORD)) # bytes left
1476 msg = "running"
1477 proc = subprocess.Popen([sys.executable, "-c",
1478 "import sys;"
1479 "sys.stdout.write('{}');"
1480 "sys.stdout.flush();"
1481 "input()".format(msg)],
1482 stdout=subprocess.PIPE,
1483 stderr=subprocess.PIPE,
1484 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001485 self.addCleanup(proc.stdout.close)
1486 self.addCleanup(proc.stderr.close)
1487 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001488
1489 count, max = 0, 100
1490 while count < max and proc.poll() is None:
1491 # Create a string buffer to store the result of stdout from the pipe
1492 buf = ctypes.create_string_buffer(len(msg))
1493 # Obtain the text currently in proc.stdout
1494 # Bytes read/avail/left are left as NULL and unused
1495 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1496 buf, ctypes.sizeof(buf), None, None, None)
1497 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1498 if buf.value:
1499 self.assertEqual(msg, buf.value.decode())
1500 break
1501 time.sleep(0.1)
1502 count += 1
1503 else:
1504 self.fail("Did not receive communication from the subprocess")
1505
Brian Curtineb24d742010-04-12 17:16:38 +00001506 os.kill(proc.pid, sig)
1507 self.assertEqual(proc.wait(), sig)
1508
1509 def test_kill_sigterm(self):
1510 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001511 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001512
1513 def test_kill_int(self):
1514 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001515 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001516
1517 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001518 tagname = "test_os_%s" % uuid.uuid1()
1519 m = mmap.mmap(-1, 1, tagname)
1520 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001521 # Run a script which has console control handling enabled.
1522 proc = subprocess.Popen([sys.executable,
1523 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001524 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001525 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1526 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001527 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001528 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001529 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001530 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001531 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001532 count += 1
1533 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001534 # Forcefully kill the process if we weren't able to signal it.
1535 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001536 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001537 os.kill(proc.pid, event)
1538 # proc.send_signal(event) could also be done here.
1539 # Allow time for the signal to be passed and the process to exit.
1540 time.sleep(0.5)
1541 if not proc.poll():
1542 # Forcefully kill the process if we weren't able to signal it.
1543 os.kill(proc.pid, signal.SIGINT)
1544 self.fail("subprocess did not stop on {}".format(name))
1545
1546 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1547 def test_CTRL_C_EVENT(self):
1548 from ctypes import wintypes
1549 import ctypes
1550
1551 # Make a NULL value by creating a pointer with no argument.
1552 NULL = ctypes.POINTER(ctypes.c_int)()
1553 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1554 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1555 wintypes.BOOL)
1556 SetConsoleCtrlHandler.restype = wintypes.BOOL
1557
1558 # Calling this with NULL and FALSE causes the calling process to
1559 # handle CTRL+C, rather than ignore it. This property is inherited
1560 # by subprocesses.
1561 SetConsoleCtrlHandler(NULL, 0)
1562
1563 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1564
1565 def test_CTRL_BREAK_EVENT(self):
1566 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1567
1568
Brian Curtind40e6f72010-07-08 21:39:08 +00001569@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001570class Win32ListdirTests(unittest.TestCase):
1571 """Test listdir on Windows."""
1572
1573 def setUp(self):
1574 self.created_paths = []
1575 for i in range(2):
1576 dir_name = 'SUB%d' % i
1577 dir_path = os.path.join(support.TESTFN, dir_name)
1578 file_name = 'FILE%d' % i
1579 file_path = os.path.join(support.TESTFN, file_name)
1580 os.makedirs(dir_path)
1581 with open(file_path, 'w') as f:
1582 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1583 self.created_paths.extend([dir_name, file_name])
1584 self.created_paths.sort()
1585
1586 def tearDown(self):
1587 shutil.rmtree(support.TESTFN)
1588
1589 def test_listdir_no_extended_path(self):
1590 """Test when the path is not an "extended" path."""
1591 # unicode
1592 self.assertEqual(
1593 sorted(os.listdir(support.TESTFN)),
1594 self.created_paths)
1595 # bytes
1596 self.assertEqual(
1597 sorted(os.listdir(os.fsencode(support.TESTFN))),
1598 [os.fsencode(path) for path in self.created_paths])
1599
1600 def test_listdir_extended_path(self):
1601 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001602 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001603 # unicode
1604 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1605 self.assertEqual(
1606 sorted(os.listdir(path)),
1607 self.created_paths)
1608 # bytes
1609 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1610 self.assertEqual(
1611 sorted(os.listdir(path)),
1612 [os.fsencode(path) for path in self.created_paths])
1613
1614
1615@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001616@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001617class Win32SymlinkTests(unittest.TestCase):
1618 filelink = 'filelinktest'
1619 filelink_target = os.path.abspath(__file__)
1620 dirlink = 'dirlinktest'
1621 dirlink_target = os.path.dirname(filelink_target)
1622 missing_link = 'missing link'
1623
1624 def setUp(self):
1625 assert os.path.exists(self.dirlink_target)
1626 assert os.path.exists(self.filelink_target)
1627 assert not os.path.exists(self.dirlink)
1628 assert not os.path.exists(self.filelink)
1629 assert not os.path.exists(self.missing_link)
1630
1631 def tearDown(self):
1632 if os.path.exists(self.filelink):
1633 os.remove(self.filelink)
1634 if os.path.exists(self.dirlink):
1635 os.rmdir(self.dirlink)
1636 if os.path.lexists(self.missing_link):
1637 os.remove(self.missing_link)
1638
1639 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001640 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001641 self.assertTrue(os.path.exists(self.dirlink))
1642 self.assertTrue(os.path.isdir(self.dirlink))
1643 self.assertTrue(os.path.islink(self.dirlink))
1644 self.check_stat(self.dirlink, self.dirlink_target)
1645
1646 def test_file_link(self):
1647 os.symlink(self.filelink_target, self.filelink)
1648 self.assertTrue(os.path.exists(self.filelink))
1649 self.assertTrue(os.path.isfile(self.filelink))
1650 self.assertTrue(os.path.islink(self.filelink))
1651 self.check_stat(self.filelink, self.filelink_target)
1652
1653 def _create_missing_dir_link(self):
1654 'Create a "directory" link to a non-existent target'
1655 linkname = self.missing_link
1656 if os.path.lexists(linkname):
1657 os.remove(linkname)
1658 target = r'c:\\target does not exist.29r3c740'
1659 assert not os.path.exists(target)
1660 target_is_dir = True
1661 os.symlink(target, linkname, target_is_dir)
1662
1663 def test_remove_directory_link_to_missing_target(self):
1664 self._create_missing_dir_link()
1665 # For compatibility with Unix, os.remove will check the
1666 # directory status and call RemoveDirectory if the symlink
1667 # was created with target_is_dir==True.
1668 os.remove(self.missing_link)
1669
1670 @unittest.skip("currently fails; consider for improvement")
1671 def test_isdir_on_directory_link_to_missing_target(self):
1672 self._create_missing_dir_link()
1673 # consider having isdir return true for directory links
1674 self.assertTrue(os.path.isdir(self.missing_link))
1675
1676 @unittest.skip("currently fails; consider for improvement")
1677 def test_rmdir_on_directory_link_to_missing_target(self):
1678 self._create_missing_dir_link()
1679 # consider allowing rmdir to remove directory links
1680 os.rmdir(self.missing_link)
1681
1682 def check_stat(self, link, target):
1683 self.assertEqual(os.stat(link), os.stat(target))
1684 self.assertNotEqual(os.lstat(link), os.stat(link))
1685
Brian Curtind25aef52011-06-13 15:16:04 -05001686 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001687 with warnings.catch_warnings():
1688 warnings.simplefilter("ignore", DeprecationWarning)
1689 self.assertEqual(os.stat(bytes_link), os.stat(target))
1690 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001691
1692 def test_12084(self):
1693 level1 = os.path.abspath(support.TESTFN)
1694 level2 = os.path.join(level1, "level2")
1695 level3 = os.path.join(level2, "level3")
1696 try:
1697 os.mkdir(level1)
1698 os.mkdir(level2)
1699 os.mkdir(level3)
1700
1701 file1 = os.path.abspath(os.path.join(level1, "file1"))
1702
1703 with open(file1, "w") as f:
1704 f.write("file1")
1705
1706 orig_dir = os.getcwd()
1707 try:
1708 os.chdir(level2)
1709 link = os.path.join(level2, "link")
1710 os.symlink(os.path.relpath(file1), "link")
1711 self.assertIn("link", os.listdir(os.getcwd()))
1712
1713 # Check os.stat calls from the same dir as the link
1714 self.assertEqual(os.stat(file1), os.stat("link"))
1715
1716 # Check os.stat calls from a dir below the link
1717 os.chdir(level1)
1718 self.assertEqual(os.stat(file1),
1719 os.stat(os.path.relpath(link)))
1720
1721 # Check os.stat calls from a dir above the link
1722 os.chdir(level3)
1723 self.assertEqual(os.stat(file1),
1724 os.stat(os.path.relpath(link)))
1725 finally:
1726 os.chdir(orig_dir)
1727 except OSError as err:
1728 self.fail(err)
1729 finally:
1730 os.remove(file1)
1731 shutil.rmtree(level1)
1732
Brian Curtind40e6f72010-07-08 21:39:08 +00001733
Jason R. Coombs3a092862013-05-27 23:21:28 -04001734@support.skip_unless_symlink
1735class NonLocalSymlinkTests(unittest.TestCase):
1736
1737 def setUp(self):
1738 """
1739 Create this structure:
1740
1741 base
1742 \___ some_dir
1743 """
1744 os.makedirs('base/some_dir')
1745
1746 def tearDown(self):
1747 shutil.rmtree('base')
1748
1749 def test_directory_link_nonlocal(self):
1750 """
1751 The symlink target should resolve relative to the link, not relative
1752 to the current directory.
1753
1754 Then, link base/some_link -> base/some_dir and ensure that some_link
1755 is resolved as a directory.
1756
1757 In issue13772, it was discovered that directory detection failed if
1758 the symlink target was not specified relative to the current
1759 directory, which was a defect in the implementation.
1760 """
1761 src = os.path.join('base', 'some_link')
1762 os.symlink('some_dir', src)
1763 assert os.path.isdir(src)
1764
1765
Victor Stinnere8d51452010-08-19 01:05:19 +00001766class FSEncodingTests(unittest.TestCase):
1767 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001768 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1769 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001770
Victor Stinnere8d51452010-08-19 01:05:19 +00001771 def test_identity(self):
1772 # assert fsdecode(fsencode(x)) == x
1773 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1774 try:
1775 bytesfn = os.fsencode(fn)
1776 except UnicodeEncodeError:
1777 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001778 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001779
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001780
Brett Cannonefb00c02012-02-29 18:31:31 -05001781
1782class DeviceEncodingTests(unittest.TestCase):
1783
1784 def test_bad_fd(self):
1785 # Return None when an fd doesn't actually exist.
1786 self.assertIsNone(os.device_encoding(123456))
1787
Philip Jenveye308b7c2012-02-29 16:16:15 -08001788 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1789 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001790 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001791 def test_device_encoding(self):
1792 encoding = os.device_encoding(0)
1793 self.assertIsNotNone(encoding)
1794 self.assertTrue(codecs.lookup(encoding))
1795
1796
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001797class PidTests(unittest.TestCase):
1798 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1799 def test_getppid(self):
1800 p = subprocess.Popen([sys.executable, '-c',
1801 'import os; print(os.getppid())'],
1802 stdout=subprocess.PIPE)
1803 stdout, _ = p.communicate()
1804 # We are the parent of our subprocess
1805 self.assertEqual(int(stdout), os.getpid())
1806
1807
Brian Curtin0151b8e2010-09-24 13:43:43 +00001808# The introduction of this TestCase caused at least two different errors on
1809# *nix buildbots. Temporarily skip this to let the buildbots move along.
1810@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001811@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1812class LoginTests(unittest.TestCase):
1813 def test_getlogin(self):
1814 user_name = os.getlogin()
1815 self.assertNotEqual(len(user_name), 0)
1816
1817
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001818@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1819 "needs os.getpriority and os.setpriority")
1820class ProgramPriorityTests(unittest.TestCase):
1821 """Tests for os.getpriority() and os.setpriority()."""
1822
1823 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001824
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001825 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1826 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1827 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001828 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1829 if base >= 19 and new_prio <= 19:
1830 raise unittest.SkipTest(
1831 "unable to reliably test setpriority at current nice level of %s" % base)
1832 else:
1833 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001834 finally:
1835 try:
1836 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1837 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001838 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001839 raise
1840
1841
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001842if threading is not None:
1843 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001844
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001845 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001846
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001847 def __init__(self, conn):
1848 asynchat.async_chat.__init__(self, conn)
1849 self.in_buffer = []
1850 self.closed = False
1851 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001852
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001853 def handle_read(self):
1854 data = self.recv(4096)
1855 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001856
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001857 def get_data(self):
1858 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001859
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001860 def handle_close(self):
1861 self.close()
1862 self.closed = True
1863
1864 def handle_error(self):
1865 raise
1866
1867 def __init__(self, address):
1868 threading.Thread.__init__(self)
1869 asyncore.dispatcher.__init__(self)
1870 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1871 self.bind(address)
1872 self.listen(5)
1873 self.host, self.port = self.socket.getsockname()[:2]
1874 self.handler_instance = None
1875 self._active = False
1876 self._active_lock = threading.Lock()
1877
1878 # --- public API
1879
1880 @property
1881 def running(self):
1882 return self._active
1883
1884 def start(self):
1885 assert not self.running
1886 self.__flag = threading.Event()
1887 threading.Thread.start(self)
1888 self.__flag.wait()
1889
1890 def stop(self):
1891 assert self.running
1892 self._active = False
1893 self.join()
1894
1895 def wait(self):
1896 # wait for handler connection to be closed, then stop the server
1897 while not getattr(self.handler_instance, "closed", False):
1898 time.sleep(0.001)
1899 self.stop()
1900
1901 # --- internals
1902
1903 def run(self):
1904 self._active = True
1905 self.__flag.set()
1906 while self._active and asyncore.socket_map:
1907 self._active_lock.acquire()
1908 asyncore.loop(timeout=0.001, count=1)
1909 self._active_lock.release()
1910 asyncore.close_all()
1911
1912 def handle_accept(self):
1913 conn, addr = self.accept()
1914 self.handler_instance = self.Handler(conn)
1915
1916 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001917 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001918 handle_read = handle_connect
1919
1920 def writable(self):
1921 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001922
1923 def handle_error(self):
1924 raise
1925
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001926
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001927@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001928@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1929class TestSendfile(unittest.TestCase):
1930
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001931 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001932 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001933 not sys.platform.startswith("solaris") and \
1934 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02001935 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
1936 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001937
1938 @classmethod
1939 def setUpClass(cls):
1940 with open(support.TESTFN, "wb") as f:
1941 f.write(cls.DATA)
1942
1943 @classmethod
1944 def tearDownClass(cls):
1945 support.unlink(support.TESTFN)
1946
1947 def setUp(self):
1948 self.server = SendfileTestServer((support.HOST, 0))
1949 self.server.start()
1950 self.client = socket.socket()
1951 self.client.connect((self.server.host, self.server.port))
1952 self.client.settimeout(1)
1953 # synchronize by waiting for "220 ready" response
1954 self.client.recv(1024)
1955 self.sockno = self.client.fileno()
1956 self.file = open(support.TESTFN, 'rb')
1957 self.fileno = self.file.fileno()
1958
1959 def tearDown(self):
1960 self.file.close()
1961 self.client.close()
1962 if self.server.running:
1963 self.server.stop()
1964
1965 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1966 """A higher level wrapper representing how an application is
1967 supposed to use sendfile().
1968 """
1969 while 1:
1970 try:
1971 if self.SUPPORT_HEADERS_TRAILERS:
1972 return os.sendfile(sock, file, offset, nbytes, headers,
1973 trailers)
1974 else:
1975 return os.sendfile(sock, file, offset, nbytes)
1976 except OSError as err:
1977 if err.errno == errno.ECONNRESET:
1978 # disconnected
1979 raise
1980 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1981 # we have to retry send data
1982 continue
1983 else:
1984 raise
1985
1986 def test_send_whole_file(self):
1987 # normal send
1988 total_sent = 0
1989 offset = 0
1990 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001991 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001992 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1993 if sent == 0:
1994 break
1995 offset += sent
1996 total_sent += sent
1997 self.assertTrue(sent <= nbytes)
1998 self.assertEqual(offset, total_sent)
1999
2000 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002001 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002002 self.client.close()
2003 self.server.wait()
2004 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002005 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002006 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002007
2008 def test_send_at_certain_offset(self):
2009 # start sending a file at a certain offset
2010 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002011 offset = len(self.DATA) // 2
2012 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002013 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002014 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002015 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2016 if sent == 0:
2017 break
2018 offset += sent
2019 total_sent += sent
2020 self.assertTrue(sent <= nbytes)
2021
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002022 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002023 self.client.close()
2024 self.server.wait()
2025 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002026 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002027 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002028 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002029 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002030
2031 def test_offset_overflow(self):
2032 # specify an offset > file size
2033 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002034 try:
2035 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2036 except OSError as e:
2037 # Solaris can raise EINVAL if offset >= file length, ignore.
2038 if e.errno != errno.EINVAL:
2039 raise
2040 else:
2041 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002042 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002043 self.client.close()
2044 self.server.wait()
2045 data = self.server.handler_instance.get_data()
2046 self.assertEqual(data, b'')
2047
2048 def test_invalid_offset(self):
2049 with self.assertRaises(OSError) as cm:
2050 os.sendfile(self.sockno, self.fileno, -1, 4096)
2051 self.assertEqual(cm.exception.errno, errno.EINVAL)
2052
2053 # --- headers / trailers tests
2054
Serhiy Storchaka43767632013-11-03 21:31:38 +02002055 @requires_headers_trailers
2056 def test_headers(self):
2057 total_sent = 0
2058 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2059 headers=[b"x" * 512])
2060 total_sent += sent
2061 offset = 4096
2062 nbytes = 4096
2063 while 1:
2064 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2065 offset, nbytes)
2066 if sent == 0:
2067 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002068 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002069 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002070
Serhiy Storchaka43767632013-11-03 21:31:38 +02002071 expected_data = b"x" * 512 + self.DATA
2072 self.assertEqual(total_sent, len(expected_data))
2073 self.client.close()
2074 self.server.wait()
2075 data = self.server.handler_instance.get_data()
2076 self.assertEqual(hash(data), hash(expected_data))
2077
2078 @requires_headers_trailers
2079 def test_trailers(self):
2080 TESTFN2 = support.TESTFN + "2"
2081 file_data = b"abcdef"
2082 with open(TESTFN2, 'wb') as f:
2083 f.write(file_data)
2084 with open(TESTFN2, 'rb')as f:
2085 self.addCleanup(os.remove, TESTFN2)
2086 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2087 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002088 self.client.close()
2089 self.server.wait()
2090 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002091 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002092
Serhiy Storchaka43767632013-11-03 21:31:38 +02002093 @requires_headers_trailers
2094 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2095 'test needs os.SF_NODISKIO')
2096 def test_flags(self):
2097 try:
2098 os.sendfile(self.sockno, self.fileno, 0, 4096,
2099 flags=os.SF_NODISKIO)
2100 except OSError as err:
2101 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2102 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002103
2104
Larry Hastings9cf065c2012-06-22 16:30:09 -07002105def supports_extended_attributes():
2106 if not hasattr(os, "setxattr"):
2107 return False
2108 try:
2109 with open(support.TESTFN, "wb") as fp:
2110 try:
2111 os.setxattr(fp.fileno(), b"user.test", b"")
2112 except OSError:
2113 return False
2114 finally:
2115 support.unlink(support.TESTFN)
2116 # Kernels < 2.6.39 don't respect setxattr flags.
2117 kernel_version = platform.release()
2118 m = re.match("2.6.(\d{1,2})", kernel_version)
2119 return m is None or int(m.group(1)) >= 39
2120
2121
2122@unittest.skipUnless(supports_extended_attributes(),
2123 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002124class ExtendedAttributeTests(unittest.TestCase):
2125
2126 def tearDown(self):
2127 support.unlink(support.TESTFN)
2128
Larry Hastings9cf065c2012-06-22 16:30:09 -07002129 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002130 fn = support.TESTFN
2131 open(fn, "wb").close()
2132 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002133 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002134 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002135 init_xattr = listxattr(fn)
2136 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002137 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002138 xattr = set(init_xattr)
2139 xattr.add("user.test")
2140 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002141 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2142 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2143 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002144 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002145 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002146 self.assertEqual(cm.exception.errno, errno.EEXIST)
2147 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002148 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002149 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002150 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002151 xattr.add("user.test2")
2152 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002153 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002154 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002155 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002156 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002157 xattr.remove("user.test")
2158 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002159 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2160 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2161 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2162 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002163 many = sorted("user.test{}".format(i) for i in range(100))
2164 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002165 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002166 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002167
Larry Hastings9cf065c2012-06-22 16:30:09 -07002168 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002169 def make_bytes(s):
2170 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002171 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002172 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002173 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002174
2175 def test_simple(self):
2176 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2177 os.listxattr)
2178
2179 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002180 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2181 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002182
2183 def test_fds(self):
2184 def getxattr(path, *args):
2185 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002186 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002187 def setxattr(path, *args):
2188 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002189 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002190 def removexattr(path, *args):
2191 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002192 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002193 def listxattr(path, *args):
2194 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002195 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002196 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2197
2198
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002199@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2200class Win32DeprecatedBytesAPI(unittest.TestCase):
2201 def test_deprecated(self):
2202 import nt
2203 filename = os.fsencode(support.TESTFN)
2204 with warnings.catch_warnings():
2205 warnings.simplefilter("error", DeprecationWarning)
2206 for func, *args in (
2207 (nt._getfullpathname, filename),
2208 (nt._isdir, filename),
2209 (os.access, filename, os.R_OK),
2210 (os.chdir, filename),
2211 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002212 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002213 (os.link, filename, filename),
2214 (os.listdir, filename),
2215 (os.lstat, filename),
2216 (os.mkdir, filename),
2217 (os.open, filename, os.O_RDONLY),
2218 (os.rename, filename, filename),
2219 (os.rmdir, filename),
2220 (os.startfile, filename),
2221 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002222 (os.unlink, filename),
2223 (os.utime, filename),
2224 ):
2225 self.assertRaises(DeprecationWarning, func, *args)
2226
Victor Stinner28216442011-11-16 00:34:44 +01002227 @support.skip_unless_symlink
2228 def test_symlink(self):
2229 filename = os.fsencode(support.TESTFN)
2230 with warnings.catch_warnings():
2231 warnings.simplefilter("error", DeprecationWarning)
2232 self.assertRaises(DeprecationWarning,
2233 os.symlink, filename, filename)
2234
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002235
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002236@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2237class TermsizeTests(unittest.TestCase):
2238 def test_does_not_crash(self):
2239 """Check if get_terminal_size() returns a meaningful value.
2240
2241 There's no easy portable way to actually check the size of the
2242 terminal, so let's check if it returns something sensible instead.
2243 """
2244 try:
2245 size = os.get_terminal_size()
2246 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002247 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002248 # Under win32 a generic OSError can be thrown if the
2249 # handle cannot be retrieved
2250 self.skipTest("failed to query terminal size")
2251 raise
2252
Antoine Pitroucfade362012-02-08 23:48:59 +01002253 self.assertGreaterEqual(size.columns, 0)
2254 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002255
2256 def test_stty_match(self):
2257 """Check if stty returns the same results
2258
2259 stty actually tests stdin, so get_terminal_size is invoked on
2260 stdin explicitly. If stty succeeded, then get_terminal_size()
2261 should work too.
2262 """
2263 try:
2264 size = subprocess.check_output(['stty', 'size']).decode().split()
2265 except (FileNotFoundError, subprocess.CalledProcessError):
2266 self.skipTest("stty invocation failed")
2267 expected = (int(size[1]), int(size[0])) # reversed order
2268
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002269 try:
2270 actual = os.get_terminal_size(sys.__stdin__.fileno())
2271 except OSError as e:
2272 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2273 # Under win32 a generic OSError can be thrown if the
2274 # handle cannot be retrieved
2275 self.skipTest("failed to query terminal size")
2276 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002277 self.assertEqual(expected, actual)
2278
2279
Victor Stinner292c8352012-10-30 02:17:38 +01002280class OSErrorTests(unittest.TestCase):
2281 def setUp(self):
2282 class Str(str):
2283 pass
2284
Victor Stinnerafe17062012-10-31 22:47:43 +01002285 self.bytes_filenames = []
2286 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002287 if support.TESTFN_UNENCODABLE is not None:
2288 decoded = support.TESTFN_UNENCODABLE
2289 else:
2290 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002291 self.unicode_filenames.append(decoded)
2292 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002293 if support.TESTFN_UNDECODABLE is not None:
2294 encoded = support.TESTFN_UNDECODABLE
2295 else:
2296 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002297 self.bytes_filenames.append(encoded)
2298 self.bytes_filenames.append(memoryview(encoded))
2299
2300 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002301
2302 def test_oserror_filename(self):
2303 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002304 (self.filenames, os.chdir,),
2305 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002306 (self.filenames, os.lstat,),
2307 (self.filenames, os.open, os.O_RDONLY),
2308 (self.filenames, os.rmdir,),
2309 (self.filenames, os.stat,),
2310 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002311 ]
2312 if sys.platform == "win32":
2313 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002314 (self.bytes_filenames, os.rename, b"dst"),
2315 (self.bytes_filenames, os.replace, b"dst"),
2316 (self.unicode_filenames, os.rename, "dst"),
2317 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002318 # Issue #16414: Don't test undecodable names with listdir()
2319 # because of a Windows bug.
2320 #
2321 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2322 # empty list (instead of failing), whereas os.listdir(b'\xff')
2323 # raises a FileNotFoundError. It looks like a Windows bug:
2324 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2325 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2326 # ERROR_PATH_NOT_FOUND (3).
2327 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002328 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002329 else:
2330 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002331 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002332 (self.filenames, os.rename, "dst"),
2333 (self.filenames, os.replace, "dst"),
2334 ))
2335 if hasattr(os, "chown"):
2336 funcs.append((self.filenames, os.chown, 0, 0))
2337 if hasattr(os, "lchown"):
2338 funcs.append((self.filenames, os.lchown, 0, 0))
2339 if hasattr(os, "truncate"):
2340 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002341 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002342 funcs.append((self.filenames, os.chflags, 0))
2343 if hasattr(os, "lchflags"):
2344 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002345 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002346 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002347 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002348 if sys.platform == "win32":
2349 funcs.append((self.bytes_filenames, os.link, b"dst"))
2350 funcs.append((self.unicode_filenames, os.link, "dst"))
2351 else:
2352 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002353 if hasattr(os, "listxattr"):
2354 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002355 (self.filenames, os.listxattr,),
2356 (self.filenames, os.getxattr, "user.test"),
2357 (self.filenames, os.setxattr, "user.test", b'user'),
2358 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002359 ))
2360 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002361 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002362 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002363 if sys.platform == "win32":
2364 funcs.append((self.unicode_filenames, os.readlink,))
2365 else:
2366 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002367
Victor Stinnerafe17062012-10-31 22:47:43 +01002368 for filenames, func, *func_args in funcs:
2369 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002370 try:
2371 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002372 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002373 self.assertIs(err.filename, name)
2374 else:
2375 self.fail("No exception thrown by {}".format(func))
2376
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002377class CPUCountTests(unittest.TestCase):
2378 def test_cpu_count(self):
2379 cpus = os.cpu_count()
2380 if cpus is not None:
2381 self.assertIsInstance(cpus, int)
2382 self.assertGreater(cpus, 0)
2383 else:
2384 self.skipTest("Could not determine the number of CPUs")
2385
Victor Stinnerdaf45552013-08-28 00:53:59 +02002386
2387class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002388 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002389 fd = os.open(__file__, os.O_RDONLY)
2390 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002391 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002392
Victor Stinnerdaf45552013-08-28 00:53:59 +02002393 os.set_inheritable(fd, True)
2394 self.assertEqual(os.get_inheritable(fd), True)
2395
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002396 @unittest.skipIf(fcntl is None, "need fcntl")
2397 def test_get_inheritable_cloexec(self):
2398 fd = os.open(__file__, os.O_RDONLY)
2399 self.addCleanup(os.close, fd)
2400 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002401
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002402 # clear FD_CLOEXEC flag
2403 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2404 flags &= ~fcntl.FD_CLOEXEC
2405 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002406
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002407 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002408
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002409 @unittest.skipIf(fcntl is None, "need fcntl")
2410 def test_set_inheritable_cloexec(self):
2411 fd = os.open(__file__, os.O_RDONLY)
2412 self.addCleanup(os.close, fd)
2413 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2414 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002415
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002416 os.set_inheritable(fd, True)
2417 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2418 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002419
Victor Stinnerdaf45552013-08-28 00:53:59 +02002420 def test_open(self):
2421 fd = os.open(__file__, os.O_RDONLY)
2422 self.addCleanup(os.close, fd)
2423 self.assertEqual(os.get_inheritable(fd), False)
2424
2425 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2426 def test_pipe(self):
2427 rfd, wfd = os.pipe()
2428 self.addCleanup(os.close, rfd)
2429 self.addCleanup(os.close, wfd)
2430 self.assertEqual(os.get_inheritable(rfd), False)
2431 self.assertEqual(os.get_inheritable(wfd), False)
2432
2433 def test_dup(self):
2434 fd1 = os.open(__file__, os.O_RDONLY)
2435 self.addCleanup(os.close, fd1)
2436
2437 fd2 = os.dup(fd1)
2438 self.addCleanup(os.close, fd2)
2439 self.assertEqual(os.get_inheritable(fd2), False)
2440
2441 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2442 def test_dup2(self):
2443 fd = os.open(__file__, os.O_RDONLY)
2444 self.addCleanup(os.close, fd)
2445
2446 # inheritable by default
2447 fd2 = os.open(__file__, os.O_RDONLY)
2448 try:
2449 os.dup2(fd, fd2)
2450 self.assertEqual(os.get_inheritable(fd2), True)
2451 finally:
2452 os.close(fd2)
2453
2454 # force non-inheritable
2455 fd3 = os.open(__file__, os.O_RDONLY)
2456 try:
2457 os.dup2(fd, fd3, inheritable=False)
2458 self.assertEqual(os.get_inheritable(fd3), False)
2459 finally:
2460 os.close(fd3)
2461
2462 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2463 def test_openpty(self):
2464 master_fd, slave_fd = os.openpty()
2465 self.addCleanup(os.close, master_fd)
2466 self.addCleanup(os.close, slave_fd)
2467 self.assertEqual(os.get_inheritable(master_fd), False)
2468 self.assertEqual(os.get_inheritable(slave_fd), False)
2469
2470
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002471@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002472def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002473 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002474 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002475 StatAttributeTests,
2476 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002477 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002478 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002479 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002480 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002481 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002482 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002483 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002484 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002485 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002486 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002487 Win32KillTests,
Tim Golden781bbeb2013-10-25 20:24:06 +01002488 Win32ListdirTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002489 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002490 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002491 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002492 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002493 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002494 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002495 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002496 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002497 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002498 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002499 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002500 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002501 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002502 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002503 CPUCountTests,
Victor Stinnerdaf45552013-08-28 00:53:59 +02002504 FDInheritanceTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002505 )
Fred Drake2e2be372001-09-20 21:33:42 +00002506
2507if __name__ == "__main__":
2508 test_main()