blob: ce6bd91cc720a2798cca12b41b855844e6ebdb95 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070027import decimal
28import fractions
Christian Heimes25827622013-10-12 01:27:08 +020029import pickle
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020042
Georg Brandl2daf6ae2012-02-20 19:54:16 +010043from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000044
Victor Stinner034d0aa2012-06-05 01:22:15 +020045with warnings.catch_warnings():
46 warnings.simplefilter("ignore", DeprecationWarning)
47 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010048st = os.stat(__file__)
49stat_supports_subsecond = (
50 # check if float and int timestamps are different
51 (st.st_atime != st[7])
52 or (st.st_mtime != st[8])
53 or (st.st_ctime != st[9]))
54
Mark Dickinson7cf03892010-04-16 13:45:35 +000055# Detect whether we're on a Linux system that uses the (now outdated
56# and unmaintained) linuxthreads threading library. There's an issue
57# when combining linuxthreads with a failed execv call: see
58# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020059if hasattr(sys, 'thread_info') and sys.thread_info.version:
60 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
61else:
62 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000063
Stefan Krahebee49a2013-01-17 15:31:00 +010064# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
65HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
66
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067# Tests creating TESTFN
68class FileTests(unittest.TestCase):
69 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000070 if os.path.exists(support.TESTFN):
71 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000072 tearDown = setUp
73
74 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000075 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000076 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000077 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000078
Christian Heimesfdab48e2008-01-20 09:06:41 +000079 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000080 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
81 # We must allocate two consecutive file descriptors, otherwise
82 # it will mess up other file descriptors (perhaps even the three
83 # standard ones).
84 second = os.dup(first)
85 try:
86 retries = 0
87 while second != first + 1:
88 os.close(first)
89 retries += 1
90 if retries > 10:
91 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000092 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000093 first, second = second, os.dup(second)
94 finally:
95 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000096 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000097 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000098 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000099
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000100 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000101 def test_rename(self):
102 path = support.TESTFN
103 old = sys.getrefcount(path)
104 self.assertRaises(TypeError, os.rename, path, 0)
105 new = sys.getrefcount(path)
106 self.assertEqual(old, new)
107
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000108 def test_read(self):
109 with open(support.TESTFN, "w+b") as fobj:
110 fobj.write(b"spam")
111 fobj.flush()
112 fd = fobj.fileno()
113 os.lseek(fd, 0, 0)
114 s = os.read(fd, 4)
115 self.assertEqual(type(s), bytes)
116 self.assertEqual(s, b"spam")
117
118 def test_write(self):
119 # os.write() accepts bytes- and buffer-like objects but not strings
120 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
121 self.assertRaises(TypeError, os.write, fd, "beans")
122 os.write(fd, b"bacon\n")
123 os.write(fd, bytearray(b"eggs\n"))
124 os.write(fd, memoryview(b"spam\n"))
125 os.close(fd)
126 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000127 self.assertEqual(fobj.read().splitlines(),
128 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000129
Victor Stinnere0daff12011-03-20 23:36:35 +0100130 def write_windows_console(self, *args):
131 retcode = subprocess.call(args,
132 # use a new console to not flood the test output
133 creationflags=subprocess.CREATE_NEW_CONSOLE,
134 # use a shell to hide the console window (SW_HIDE)
135 shell=True)
136 self.assertEqual(retcode, 0)
137
138 @unittest.skipUnless(sys.platform == 'win32',
139 'test specific to the Windows console')
140 def test_write_windows_console(self):
141 # Issue #11395: the Windows console returns an error (12: not enough
142 # space error) on writing into stdout if stdout mode is binary and the
143 # length is greater than 66,000 bytes (or less, depending on heap
144 # usage).
145 code = "print('x' * 100000)"
146 self.write_windows_console(sys.executable, "-c", code)
147 self.write_windows_console(sys.executable, "-u", "-c", code)
148
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000149 def fdopen_helper(self, *args):
150 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200151 f = os.fdopen(fd, *args)
152 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000153
154 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200155 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
156 os.close(fd)
157
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000158 self.fdopen_helper()
159 self.fdopen_helper('r')
160 self.fdopen_helper('r', 100)
161
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100162 def test_replace(self):
163 TESTFN2 = support.TESTFN + ".2"
164 with open(support.TESTFN, 'w') as f:
165 f.write("1")
166 with open(TESTFN2, 'w') as f:
167 f.write("2")
168 self.addCleanup(os.unlink, TESTFN2)
169 os.replace(support.TESTFN, TESTFN2)
170 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
171 with open(TESTFN2, 'r') as f:
172 self.assertEqual(f.read(), "1")
173
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200174
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000175# Test attributes on return values from os.*stat* family.
176class StatAttributeTests(unittest.TestCase):
177 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000178 os.mkdir(support.TESTFN)
179 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000181 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000182 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000183
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000184 def tearDown(self):
185 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000186 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000187
Serhiy Storchaka43767632013-11-03 21:31:38 +0200188 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000189 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000190 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000191
192 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000193 self.assertEqual(result[stat.ST_SIZE], 3)
194 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000195
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000196 # Make sure all the attributes are there
197 members = dir(result)
198 for name in dir(stat):
199 if name[:3] == 'ST_':
200 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000201 if name.endswith("TIME"):
202 def trunc(x): return int(x)
203 else:
204 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000205 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000206 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000207 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000208
Larry Hastings6fe20b32012-04-19 15:07:49 -0700209 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700210 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700211 for name in 'st_atime st_mtime st_ctime'.split():
212 floaty = int(getattr(result, name) * 100000)
213 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700214 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700215
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000216 try:
217 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200218 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000219 except IndexError:
220 pass
221
222 # Make sure that assignment fails
223 try:
224 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200225 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000226 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000227 pass
228
229 try:
230 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200231 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000232 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233 pass
234
235 try:
236 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200237 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000238 except AttributeError:
239 pass
240
241 # Use the stat_result constructor with a too-short tuple.
242 try:
243 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200244 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000245 except TypeError:
246 pass
247
Ezio Melotti42da6632011-03-15 05:18:48 +0200248 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 try:
250 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
251 except TypeError:
252 pass
253
Antoine Pitrou38425292010-09-21 18:19:07 +0000254 def test_stat_attributes(self):
255 self.check_stat_attributes(self.fname)
256
257 def test_stat_attributes_bytes(self):
258 try:
259 fname = self.fname.encode(sys.getfilesystemencoding())
260 except UnicodeEncodeError:
261 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100262 with warnings.catch_warnings():
263 warnings.simplefilter("ignore", DeprecationWarning)
264 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000265
Christian Heimes25827622013-10-12 01:27:08 +0200266 def test_stat_result_pickle(self):
267 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200268 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
269 p = pickle.dumps(result, proto)
270 self.assertIn(b'stat_result', p)
271 if proto < 4:
272 self.assertIn(b'cos\nstat_result\n', p)
273 unpickled = pickle.loads(p)
274 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200275
Serhiy Storchaka43767632013-11-03 21:31:38 +0200276 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000277 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000278 try:
279 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000280 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000281 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000282 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200283 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284
285 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000286 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000287
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000288 # Make sure all the attributes are there.
289 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
290 'ffree', 'favail', 'flag', 'namemax')
291 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000292 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000293
294 # Make sure that assignment really fails
295 try:
296 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200297 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000298 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000299 pass
300
301 try:
302 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200303 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000304 except AttributeError:
305 pass
306
307 # Use the constructor with a too-short tuple.
308 try:
309 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200310 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000311 except TypeError:
312 pass
313
Ezio Melotti42da6632011-03-15 05:18:48 +0200314 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000315 try:
316 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
317 except TypeError:
318 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000319
Christian Heimes25827622013-10-12 01:27:08 +0200320 @unittest.skipUnless(hasattr(os, 'statvfs'),
321 "need os.statvfs()")
322 def test_statvfs_result_pickle(self):
323 try:
324 result = os.statvfs(self.fname)
325 except OSError as e:
326 # On AtheOS, glibc always returns ENOSYS
327 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200328 self.skipTest('os.statvfs() failed with ENOSYS')
329
Serhiy Storchakabad12572014-12-15 14:03:42 +0200330 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
331 p = pickle.dumps(result, proto)
332 self.assertIn(b'statvfs_result', p)
333 if proto < 4:
334 self.assertIn(b'cos\nstatvfs_result\n', p)
335 unpickled = pickle.loads(p)
336 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200337
Thomas Wouters89f507f2006-12-13 04:49:30 +0000338 def test_utime_dir(self):
339 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000340 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000341 # round to int, because some systems may support sub-second
342 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000343 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
344 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000345 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000346
Larry Hastings76ad59b2012-05-03 00:30:07 -0700347 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600348 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600349 # second argument. Check that the previous methods of passing
350 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700351 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600352 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700353 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
354 # Setting the time to the time you just read, then reading again,
355 # should always return exactly the same times.
356 st1 = os.stat(filename)
357 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
358 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600359 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700360 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600361 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700362 # Set to the current time in the new way
363 os.utime(filename)
364 st3 = os.stat(filename)
365 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
366
367 def test_utime(self):
368 def utime(file, times):
369 return os.utime(file, times)
370 self._test_utime(self.fname, getattr, utime, 10)
371 self._test_utime(support.TESTFN, getattr, utime, 10)
372
373
374 def _test_utime_ns(self, set_times_ns, test_dir=True):
375 def getattr_ns(o, attr):
376 return getattr(o, attr + "_ns")
377 ten_s = 10 * 1000 * 1000 * 1000
378 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
379 if test_dir:
380 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
381
382 def test_utime_ns(self):
383 def utime_ns(file, times):
384 return os.utime(file, ns=times)
385 self._test_utime_ns(utime_ns)
386
Larry Hastings9cf065c2012-06-22 16:30:09 -0700387 requires_utime_dir_fd = unittest.skipUnless(
388 os.utime in os.supports_dir_fd,
389 "dir_fd support for utime required for this test.")
390 requires_utime_fd = unittest.skipUnless(
391 os.utime in os.supports_fd,
392 "fd support for utime required for this test.")
393 requires_utime_nofollow_symlinks = unittest.skipUnless(
394 os.utime in os.supports_follow_symlinks,
395 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700396
Larry Hastings9cf065c2012-06-22 16:30:09 -0700397 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700398 def test_lutimes_ns(self):
399 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700400 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700401 self._test_utime_ns(lutimes_ns)
402
Larry Hastings9cf065c2012-06-22 16:30:09 -0700403 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700404 def test_futimes_ns(self):
405 def futimes_ns(file, times):
406 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700407 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700408 self._test_utime_ns(futimes_ns, test_dir=False)
409
410 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700411 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700412 getattr(os, name)(arg, (5, 5), ns=(5, 5))
413
414 def test_utime_invalid_arguments(self):
415 self._utime_invalid_arguments('utime', self.fname)
416
Brian Curtin52fbea12011-11-06 13:41:17 -0600417
Victor Stinner1aa54a42012-02-08 04:09:37 +0100418 @unittest.skipUnless(stat_supports_subsecond,
419 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100420 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100421 asec, amsec = 1, 901
422 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100423 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100424 mtime = msec + mmsec * 1e-3
425 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100426 os.utime(filename, (0, 0))
427 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200428 with warnings.catch_warnings():
429 warnings.simplefilter("ignore", DeprecationWarning)
430 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100431 st = os.stat(filename)
432 self.assertAlmostEqual(st.st_atime, atime, places=3)
433 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100434
Victor Stinnera2f7c002012-02-08 03:36:25 +0100435 def test_utime_subsecond(self):
436 def set_time(filename, atime, mtime):
437 os.utime(filename, (atime, mtime))
438 self._test_utime_subsecond(set_time)
439
Larry Hastings9cf065c2012-06-22 16:30:09 -0700440 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100441 def test_futimes_subsecond(self):
442 def set_time(filename, atime, mtime):
443 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700444 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100445 self._test_utime_subsecond(set_time)
446
Larry Hastings9cf065c2012-06-22 16:30:09 -0700447 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100448 def test_futimens_subsecond(self):
449 def set_time(filename, atime, mtime):
450 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700451 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100452 self._test_utime_subsecond(set_time)
453
Larry Hastings9cf065c2012-06-22 16:30:09 -0700454 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100455 def test_futimesat_subsecond(self):
456 def set_time(filename, atime, mtime):
457 dirname = os.path.dirname(filename)
458 dirfd = os.open(dirname, os.O_RDONLY)
459 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700460 os.utime(os.path.basename(filename), dir_fd=dirfd,
461 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100462 finally:
463 os.close(dirfd)
464 self._test_utime_subsecond(set_time)
465
Larry Hastings9cf065c2012-06-22 16:30:09 -0700466 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100467 def test_lutimes_subsecond(self):
468 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700469 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100470 self._test_utime_subsecond(set_time)
471
Larry Hastings9cf065c2012-06-22 16:30:09 -0700472 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100473 def test_utimensat_subsecond(self):
474 def set_time(filename, atime, mtime):
475 dirname = os.path.dirname(filename)
476 dirfd = os.open(dirname, os.O_RDONLY)
477 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700478 os.utime(os.path.basename(filename), dir_fd=dirfd,
479 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100480 finally:
481 os.close(dirfd)
482 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100483
Serhiy Storchaka43767632013-11-03 21:31:38 +0200484 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000485 # systems support centiseconds
Serhiy Storchaka43767632013-11-03 21:31:38 +0200486 def get_file_system(path):
487 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000488 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000489 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000490 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000491 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000492 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000493 return buf.value
494
Serhiy Storchaka43767632013-11-03 21:31:38 +0200495 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
496 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
497 "requires NTFS")
498 def test_1565150(self):
499 t1 = 1159195039.25
500 os.utime(self.fname, (t1, t1))
501 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000502
Serhiy Storchaka43767632013-11-03 21:31:38 +0200503 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
504 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
505 "requires NTFS")
506 def test_large_time(self):
507 t1 = 5000000000 # some day in 2128
508 os.utime(self.fname, (t1, t1))
509 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000510
Serhiy Storchaka43767632013-11-03 21:31:38 +0200511 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
512 def test_1686475(self):
513 # Verify that an open file can be stat'ed
514 try:
515 os.stat(r"c:\pagefile.sys")
516 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600517 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200518 except OSError as e:
519 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000520
Serhiy Storchaka43767632013-11-03 21:31:38 +0200521 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
522 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
523 def test_15261(self):
524 # Verify that stat'ing a closed fd does not cause crash
525 r, w = os.pipe()
526 try:
527 os.stat(r) # should not raise error
528 finally:
529 os.close(r)
530 os.close(w)
531 with self.assertRaises(OSError) as ctx:
532 os.stat(r)
533 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100534
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000535from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000536
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000537class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000538 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000539 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000540
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000541 def setUp(self):
542 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000543 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000544 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000545 for key, value in self._reference().items():
546 os.environ[key] = value
547
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000548 def tearDown(self):
549 os.environ.clear()
550 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000551 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000552 os.environb.clear()
553 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000554
Christian Heimes90333392007-11-01 19:08:42 +0000555 def _reference(self):
556 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
557
558 def _empty_mapping(self):
559 os.environ.clear()
560 return os.environ
561
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000562 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300563 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000564 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000565 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300566 os.environ.update(HELLO="World")
567 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
568 value = popen.read().strip()
569 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000570
Ezio Melottic7e139b2012-09-26 20:01:34 +0300571 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000572 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300573 with os.popen(
574 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
575 it = iter(popen)
576 self.assertEqual(next(it), "line1\n")
577 self.assertEqual(next(it), "line2\n")
578 self.assertEqual(next(it), "line3\n")
579 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000580
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000581 # Verify environ keys and values from the OS are of the
582 # correct str type.
583 def test_keyvalue_types(self):
584 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000585 self.assertEqual(type(key), str)
586 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000587
Christian Heimes90333392007-11-01 19:08:42 +0000588 def test_items(self):
589 for key, value in self._reference().items():
590 self.assertEqual(os.environ.get(key), value)
591
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000592 # Issue 7310
593 def test___repr__(self):
594 """Check that the repr() of os.environ looks like environ({...})."""
595 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000596 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
597 '{!r}: {!r}'.format(key, value)
598 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000599
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000600 def test_get_exec_path(self):
601 defpath_list = os.defpath.split(os.pathsep)
602 test_path = ['/monty', '/python', '', '/flying/circus']
603 test_env = {'PATH': os.pathsep.join(test_path)}
604
605 saved_environ = os.environ
606 try:
607 os.environ = dict(test_env)
608 # Test that defaulting to os.environ works.
609 self.assertSequenceEqual(test_path, os.get_exec_path())
610 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
611 finally:
612 os.environ = saved_environ
613
614 # No PATH environment variable
615 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
616 # Empty PATH environment variable
617 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
618 # Supplied PATH environment variable
619 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
620
Victor Stinnerb745a742010-05-18 17:17:23 +0000621 if os.supports_bytes_environ:
622 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000623 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000624 # ignore BytesWarning warning
625 with warnings.catch_warnings(record=True):
626 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000627 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000628 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000629 pass
630 else:
631 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000632
633 # bytes key and/or value
634 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
635 ['abc'])
636 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
637 ['abc'])
638 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
639 ['abc'])
640
641 @unittest.skipUnless(os.supports_bytes_environ,
642 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000643 def test_environb(self):
644 # os.environ -> os.environb
645 value = 'euro\u20ac'
646 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000647 value_bytes = value.encode(sys.getfilesystemencoding(),
648 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000649 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000650 msg = "U+20AC character is not encodable to %s" % (
651 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000652 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000653 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000654 self.assertEqual(os.environ['unicode'], value)
655 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000656
657 # os.environb -> os.environ
658 value = b'\xff'
659 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000660 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000661 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000662 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000663
Charles-François Natali2966f102011-11-26 11:32:46 +0100664 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
665 # #13415).
666 @support.requires_freebsd_version(7)
667 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100668 def test_unset_error(self):
669 if sys.platform == "win32":
670 # an environment variable is limited to 32,767 characters
671 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100672 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100673 else:
674 # "=" is not allowed in a variable name
675 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100676 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100677
Victor Stinner6d101392013-04-14 16:35:04 +0200678 def test_key_type(self):
679 missing = 'missingkey'
680 self.assertNotIn(missing, os.environ)
681
Victor Stinner839e5ea2013-04-14 16:43:03 +0200682 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200683 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200684 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200685 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200686
Victor Stinner839e5ea2013-04-14 16:43:03 +0200687 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200688 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200689 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200690 self.assertTrue(cm.exception.__suppress_context__)
691
Victor Stinner6d101392013-04-14 16:35:04 +0200692
Tim Petersc4e09402003-04-25 07:11:48 +0000693class WalkTests(unittest.TestCase):
694 """Tests for os.walk()."""
695
Charles-François Natali7372b062012-02-05 15:15:38 +0100696 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000697 import os
698 from os.path import join
699
700 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000701 # TESTFN/
702 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000703 # tmp1
704 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000705 # tmp2
706 # SUB11/ no kids
707 # SUB2/ a file kid and a dirsymlink kid
708 # tmp3
709 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200710 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000711 # TEST2/
712 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000713 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000714 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000715 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000716 sub2_path = join(walk_path, "SUB2")
717 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000718 tmp2_path = join(sub1_path, "tmp2")
719 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000720 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000721 t2_path = join(support.TESTFN, "TEST2")
722 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200723 link_path = join(sub2_path, "link")
724 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000725
726 # Create stuff.
727 os.makedirs(sub11_path)
728 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000729 os.makedirs(t2_path)
730 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000731 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000732 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
733 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000734 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400735 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400736 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200737 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000738 else:
739 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000740
741 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000742 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000743 self.assertEqual(len(all), 4)
744 # We can't know which order SUB1 and SUB2 will appear in.
745 # Not flipped: TESTFN, SUB1, SUB11, SUB2
746 # flipped: TESTFN, SUB2, SUB1, SUB11
747 flipped = all[0][1][0] != "SUB1"
748 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200749 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000750 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000751 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
752 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000753 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000754
755 # Prune the search.
756 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000757 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000758 all.append((root, dirs, files))
759 # Don't descend into SUB1.
760 if 'SUB1' in dirs:
761 # Note that this also mutates the dirs we appended to all!
762 dirs.remove('SUB1')
763 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000764 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200765 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000766 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000767
768 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000769 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000770 self.assertEqual(len(all), 4)
771 # We can't know which order SUB1 and SUB2 will appear in.
772 # Not flipped: SUB11, SUB1, SUB2, TESTFN
773 # flipped: SUB2, SUB11, SUB1, TESTFN
774 flipped = all[3][1][0] != "SUB1"
775 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200776 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000777 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000778 self.assertEqual(all[flipped], (sub11_path, [], []))
779 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000780 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000781
Brian Curtin3b4499c2010-12-28 14:31:47 +0000782 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000783 # Walk, following symlinks.
784 for root, dirs, files in os.walk(walk_path, followlinks=True):
785 if root == link_path:
786 self.assertEqual(dirs, [])
787 self.assertEqual(files, ["tmp4"])
788 break
789 else:
790 self.fail("Didn't follow symlink with followlinks=True")
791
792 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000793 # Tear everything down. This is a decent use for bottom-up on
794 # Windows, which doesn't have a recursive delete command. The
795 # (not so) subtlety is that rmdir will fail unless the dir's
796 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000797 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000798 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000799 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000800 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000801 dirname = os.path.join(root, name)
802 if not os.path.islink(dirname):
803 os.rmdir(dirname)
804 else:
805 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000806 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000807
Charles-François Natali7372b062012-02-05 15:15:38 +0100808
809@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
810class FwalkTests(WalkTests):
811 """Tests for os.fwalk()."""
812
Larry Hastingsc48fe982012-06-25 04:49:05 -0700813 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
814 """
815 compare with walk() results.
816 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700817 walk_kwargs = walk_kwargs.copy()
818 fwalk_kwargs = fwalk_kwargs.copy()
819 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
820 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
821 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700822
Charles-François Natali7372b062012-02-05 15:15:38 +0100823 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700824 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100825 expected[root] = (set(dirs), set(files))
826
Larry Hastingsc48fe982012-06-25 04:49:05 -0700827 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100828 self.assertIn(root, expected)
829 self.assertEqual(expected[root], (set(dirs), set(files)))
830
Larry Hastingsc48fe982012-06-25 04:49:05 -0700831 def test_compare_to_walk(self):
832 kwargs = {'top': support.TESTFN}
833 self._compare_to_walk(kwargs, kwargs)
834
Charles-François Natali7372b062012-02-05 15:15:38 +0100835 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700836 try:
837 fd = os.open(".", os.O_RDONLY)
838 walk_kwargs = {'top': support.TESTFN}
839 fwalk_kwargs = walk_kwargs.copy()
840 fwalk_kwargs['dir_fd'] = fd
841 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
842 finally:
843 os.close(fd)
844
845 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100846 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700847 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
848 args = support.TESTFN, topdown, None
849 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100850 # check that the FD is valid
851 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700852 # redundant check
853 os.stat(rootfd)
854 # check that listdir() returns consistent information
855 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100856
857 def test_fd_leak(self):
858 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
859 # we both check that calling fwalk() a large number of times doesn't
860 # yield EMFILE, and that the minimum allocated FD hasn't changed.
861 minfd = os.dup(1)
862 os.close(minfd)
863 for i in range(256):
864 for x in os.fwalk(support.TESTFN):
865 pass
866 newfd = os.dup(1)
867 self.addCleanup(os.close, newfd)
868 self.assertEqual(newfd, minfd)
869
870 def tearDown(self):
871 # cleanup
872 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
873 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700874 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100875 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700876 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700877 if stat.S_ISDIR(st.st_mode):
878 os.rmdir(name, dir_fd=rootfd)
879 else:
880 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100881 os.rmdir(support.TESTFN)
882
883
Guido van Rossume7ba4952007-06-06 23:52:48 +0000884class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000885 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000886 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000887
888 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000889 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000890 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
891 os.makedirs(path) # Should work
892 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
893 os.makedirs(path)
894
895 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000896 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000897 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
898 os.makedirs(path)
899 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
900 'dir5', 'dir6')
901 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000902
Terry Reedy5a22b652010-12-02 07:05:56 +0000903 def test_exist_ok_existing_directory(self):
904 path = os.path.join(support.TESTFN, 'dir1')
905 mode = 0o777
906 old_mask = os.umask(0o022)
907 os.makedirs(path, mode)
908 self.assertRaises(OSError, os.makedirs, path, mode)
909 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -0400910 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +0000911 os.makedirs(path, mode=mode, exist_ok=True)
912 os.umask(old_mask)
913
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400914 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700915 def test_chown_uid_gid_arguments_must_be_index(self):
916 stat = os.stat(support.TESTFN)
917 uid = stat.st_uid
918 gid = stat.st_gid
919 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
920 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
921 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
922 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
923 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
924
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700925 def test_exist_ok_s_isgid_directory(self):
926 path = os.path.join(support.TESTFN, 'dir1')
927 S_ISGID = stat.S_ISGID
928 mode = 0o777
929 old_mask = os.umask(0o022)
930 try:
931 existing_testfn_mode = stat.S_IMODE(
932 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700933 try:
934 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700935 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700936 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700937 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
938 raise unittest.SkipTest('No support for S_ISGID dir mode.')
939 # The os should apply S_ISGID from the parent dir for us, but
940 # this test need not depend on that behavior. Be explicit.
941 os.makedirs(path, mode | S_ISGID)
942 # http://bugs.python.org/issue14992
943 # Should not fail when the bit is already set.
944 os.makedirs(path, mode, exist_ok=True)
945 # remove the bit.
946 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -0400947 # May work even when the bit is not already set when demanded.
948 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700949 finally:
950 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000951
952 def test_exist_ok_existing_regular_file(self):
953 base = support.TESTFN
954 path = os.path.join(support.TESTFN, 'dir1')
955 f = open(path, 'w')
956 f.write('abc')
957 f.close()
958 self.assertRaises(OSError, os.makedirs, path)
959 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
960 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
961 os.remove(path)
962
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000963 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000964 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000965 'dir4', 'dir5', 'dir6')
966 # If the tests failed, the bottom-most directory ('../dir6')
967 # may not have been created, so we look for the outermost directory
968 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000969 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000970 path = os.path.dirname(path)
971
972 os.removedirs(path)
973
Andrew Svetlov405faed2012-12-25 12:18:09 +0200974
975class RemoveDirsTests(unittest.TestCase):
976 def setUp(self):
977 os.makedirs(support.TESTFN)
978
979 def tearDown(self):
980 support.rmtree(support.TESTFN)
981
982 def test_remove_all(self):
983 dira = os.path.join(support.TESTFN, 'dira')
984 os.mkdir(dira)
985 dirb = os.path.join(dira, 'dirb')
986 os.mkdir(dirb)
987 os.removedirs(dirb)
988 self.assertFalse(os.path.exists(dirb))
989 self.assertFalse(os.path.exists(dira))
990 self.assertFalse(os.path.exists(support.TESTFN))
991
992 def test_remove_partial(self):
993 dira = os.path.join(support.TESTFN, 'dira')
994 os.mkdir(dira)
995 dirb = os.path.join(dira, 'dirb')
996 os.mkdir(dirb)
997 with open(os.path.join(dira, 'file.txt'), 'w') as f:
998 f.write('text')
999 os.removedirs(dirb)
1000 self.assertFalse(os.path.exists(dirb))
1001 self.assertTrue(os.path.exists(dira))
1002 self.assertTrue(os.path.exists(support.TESTFN))
1003
1004 def test_remove_nothing(self):
1005 dira = os.path.join(support.TESTFN, 'dira')
1006 os.mkdir(dira)
1007 dirb = os.path.join(dira, 'dirb')
1008 os.mkdir(dirb)
1009 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1010 f.write('text')
1011 with self.assertRaises(OSError):
1012 os.removedirs(dirb)
1013 self.assertTrue(os.path.exists(dirb))
1014 self.assertTrue(os.path.exists(dira))
1015 self.assertTrue(os.path.exists(support.TESTFN))
1016
1017
Guido van Rossume7ba4952007-06-06 23:52:48 +00001018class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001019 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001020 with open(os.devnull, 'wb') as f:
1021 f.write(b'hello')
1022 f.close()
1023 with open(os.devnull, 'rb') as f:
1024 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001025
Andrew Svetlov405faed2012-12-25 12:18:09 +02001026
Guido van Rossume7ba4952007-06-06 23:52:48 +00001027class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001028 def test_urandom_length(self):
1029 self.assertEqual(len(os.urandom(0)), 0)
1030 self.assertEqual(len(os.urandom(1)), 1)
1031 self.assertEqual(len(os.urandom(10)), 10)
1032 self.assertEqual(len(os.urandom(100)), 100)
1033 self.assertEqual(len(os.urandom(1000)), 1000)
1034
1035 def test_urandom_value(self):
1036 data1 = os.urandom(16)
1037 data2 = os.urandom(16)
1038 self.assertNotEqual(data1, data2)
1039
1040 def get_urandom_subprocess(self, count):
1041 code = '\n'.join((
1042 'import os, sys',
1043 'data = os.urandom(%s)' % count,
1044 'sys.stdout.buffer.write(data)',
1045 'sys.stdout.buffer.flush()'))
1046 out = assert_python_ok('-c', code)
1047 stdout = out[1]
1048 self.assertEqual(len(stdout), 16)
1049 return stdout
1050
1051 def test_urandom_subprocess(self):
1052 data1 = self.get_urandom_subprocess(16)
1053 data2 = self.get_urandom_subprocess(16)
1054 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001055
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001056 @unittest.skipUnless(resource, "test requires the resource module")
1057 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001058 # Check urandom() failing when it is not able to open /dev/random.
1059 # We spawn a new process to make the test more robust (if getrlimit()
1060 # failed to restore the file descriptor limit after this, the whole
1061 # test suite would crash; this actually happened on the OS X Tiger
1062 # buildbot).
1063 code = """if 1:
1064 import errno
1065 import os
1066 import resource
1067
1068 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1069 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1070 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001071 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001072 except OSError as e:
1073 assert e.errno == errno.EMFILE, e.errno
1074 else:
1075 raise AssertionError("OSError not raised")
1076 """
1077 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001078
Antoine Pitroue472aea2014-04-26 14:33:03 +02001079 def test_urandom_fd_closed(self):
1080 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1081 # closed.
1082 code = """if 1:
1083 import os
1084 import sys
1085 os.urandom(4)
1086 os.closerange(3, 256)
1087 sys.stdout.buffer.write(os.urandom(4))
1088 """
1089 rc, out, err = assert_python_ok('-Sc', code)
1090
1091 def test_urandom_fd_reopened(self):
1092 # Issue #21207: urandom() should detect its fd to /dev/urandom
1093 # changed to something else, and reopen it.
1094 with open(support.TESTFN, 'wb') as f:
1095 f.write(b"x" * 256)
1096 self.addCleanup(os.unlink, support.TESTFN)
1097 code = """if 1:
1098 import os
1099 import sys
1100 os.urandom(4)
1101 for fd in range(3, 256):
1102 try:
1103 os.close(fd)
1104 except OSError:
1105 pass
1106 else:
1107 # Found the urandom fd (XXX hopefully)
1108 break
1109 os.closerange(3, 256)
1110 with open({TESTFN!r}, 'rb') as f:
1111 os.dup2(f.fileno(), fd)
1112 sys.stdout.buffer.write(os.urandom(4))
1113 sys.stdout.buffer.write(os.urandom(4))
1114 """.format(TESTFN=support.TESTFN)
1115 rc, out, err = assert_python_ok('-Sc', code)
1116 self.assertEqual(len(out), 8)
1117 self.assertNotEqual(out[0:4], out[4:8])
1118 rc, out2, err2 = assert_python_ok('-Sc', code)
1119 self.assertEqual(len(out2), 8)
1120 self.assertNotEqual(out2, out)
1121
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001122
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001123@contextlib.contextmanager
1124def _execvpe_mockup(defpath=None):
1125 """
1126 Stubs out execv and execve functions when used as context manager.
1127 Records exec calls. The mock execv and execve functions always raise an
1128 exception as they would normally never return.
1129 """
1130 # A list of tuples containing (function name, first arg, args)
1131 # of calls to execv or execve that have been made.
1132 calls = []
1133
1134 def mock_execv(name, *args):
1135 calls.append(('execv', name, args))
1136 raise RuntimeError("execv called")
1137
1138 def mock_execve(name, *args):
1139 calls.append(('execve', name, args))
1140 raise OSError(errno.ENOTDIR, "execve called")
1141
1142 try:
1143 orig_execv = os.execv
1144 orig_execve = os.execve
1145 orig_defpath = os.defpath
1146 os.execv = mock_execv
1147 os.execve = mock_execve
1148 if defpath is not None:
1149 os.defpath = defpath
1150 yield calls
1151 finally:
1152 os.execv = orig_execv
1153 os.execve = orig_execve
1154 os.defpath = orig_defpath
1155
Guido van Rossume7ba4952007-06-06 23:52:48 +00001156class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001157 @unittest.skipIf(USING_LINUXTHREADS,
1158 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001159 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001160 self.assertRaises(OSError, os.execvpe, 'no such app-',
1161 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001162
Thomas Heller6790d602007-08-30 17:15:14 +00001163 def test_execvpe_with_bad_arglist(self):
1164 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1165
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001166 @unittest.skipUnless(hasattr(os, '_execvpe'),
1167 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001168 def _test_internal_execvpe(self, test_type):
1169 program_path = os.sep + 'absolutepath'
1170 if test_type is bytes:
1171 program = b'executable'
1172 fullpath = os.path.join(os.fsencode(program_path), program)
1173 native_fullpath = fullpath
1174 arguments = [b'progname', 'arg1', 'arg2']
1175 else:
1176 program = 'executable'
1177 arguments = ['progname', 'arg1', 'arg2']
1178 fullpath = os.path.join(program_path, program)
1179 if os.name != "nt":
1180 native_fullpath = os.fsencode(fullpath)
1181 else:
1182 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001183 env = {'spam': 'beans'}
1184
Victor Stinnerb745a742010-05-18 17:17:23 +00001185 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001186 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001187 self.assertRaises(RuntimeError,
1188 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001189 self.assertEqual(len(calls), 1)
1190 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1191
Victor Stinnerb745a742010-05-18 17:17:23 +00001192 # test os._execvpe() with a relative path:
1193 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001194 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001195 self.assertRaises(OSError,
1196 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001197 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001198 self.assertSequenceEqual(calls[0],
1199 ('execve', native_fullpath, (arguments, env)))
1200
1201 # test os._execvpe() with a relative path:
1202 # os.get_exec_path() reads the 'PATH' variable
1203 with _execvpe_mockup() as calls:
1204 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001205 if test_type is bytes:
1206 env_path[b'PATH'] = program_path
1207 else:
1208 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001209 self.assertRaises(OSError,
1210 os._execvpe, program, arguments, env=env_path)
1211 self.assertEqual(len(calls), 1)
1212 self.assertSequenceEqual(calls[0],
1213 ('execve', native_fullpath, (arguments, env_path)))
1214
1215 def test_internal_execvpe_str(self):
1216 self._test_internal_execvpe(str)
1217 if os.name != "nt":
1218 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001219
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001220
Serhiy Storchaka43767632013-11-03 21:31:38 +02001221@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001222class Win32ErrorTests(unittest.TestCase):
1223 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001224 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001225
1226 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001227 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001228
1229 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001230 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001231
1232 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001233 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001234 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001235 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001236 finally:
1237 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001238 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001239
1240 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001241 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001242
Thomas Wouters477c8d52006-05-27 19:21:47 +00001243 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001244 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001245
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001246class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001247 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001248 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1249 #singles.append("close")
1250 #We omit close because it doesn'r raise an exception on some platforms
1251 def get_single(f):
1252 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001253 if hasattr(os, f):
1254 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001255 return helper
1256 for f in singles:
1257 locals()["test_"+f] = get_single(f)
1258
Benjamin Peterson7522c742009-01-19 21:00:09 +00001259 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001260 try:
1261 f(support.make_bad_fd(), *args)
1262 except OSError as e:
1263 self.assertEqual(e.errno, errno.EBADF)
1264 else:
1265 self.fail("%r didn't raise a OSError with a bad file descriptor"
1266 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001267
Serhiy Storchaka43767632013-11-03 21:31:38 +02001268 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001269 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001270 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001271
Serhiy Storchaka43767632013-11-03 21:31:38 +02001272 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001273 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001274 fd = support.make_bad_fd()
1275 # Make sure none of the descriptors we are about to close are
1276 # currently valid (issue 6542).
1277 for i in range(10):
1278 try: os.fstat(fd+i)
1279 except OSError:
1280 pass
1281 else:
1282 break
1283 if i < 2:
1284 raise unittest.SkipTest(
1285 "Unable to acquire a range of invalid file descriptors")
1286 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001287
Serhiy Storchaka43767632013-11-03 21:31:38 +02001288 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001289 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001290 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001291
Serhiy Storchaka43767632013-11-03 21:31:38 +02001292 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001293 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001294 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001295
Serhiy Storchaka43767632013-11-03 21:31:38 +02001296 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001297 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001298 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001299
Serhiy Storchaka43767632013-11-03 21:31:38 +02001300 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001301 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001302 self.check(os.pathconf, "PC_NAME_MAX")
1303 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001304
Serhiy Storchaka43767632013-11-03 21:31:38 +02001305 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001306 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001307 self.check(os.truncate, 0)
1308 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001309
Serhiy Storchaka43767632013-11-03 21:31:38 +02001310 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001311 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001312 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001313
Serhiy Storchaka43767632013-11-03 21:31:38 +02001314 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001315 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001316 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001317
Victor Stinner57ddf782014-01-08 15:21:28 +01001318 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1319 def test_readv(self):
1320 buf = bytearray(10)
1321 self.check(os.readv, [buf])
1322
Serhiy Storchaka43767632013-11-03 21:31:38 +02001323 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001324 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001325 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001326
Serhiy Storchaka43767632013-11-03 21:31:38 +02001327 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001328 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001329 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001330
Victor Stinner57ddf782014-01-08 15:21:28 +01001331 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1332 def test_writev(self):
1333 self.check(os.writev, [b'abc'])
1334
Brian Curtin1b9df392010-11-24 20:24:31 +00001335
1336class LinkTests(unittest.TestCase):
1337 def setUp(self):
1338 self.file1 = support.TESTFN
1339 self.file2 = os.path.join(support.TESTFN + "2")
1340
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001341 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001342 for file in (self.file1, self.file2):
1343 if os.path.exists(file):
1344 os.unlink(file)
1345
Brian Curtin1b9df392010-11-24 20:24:31 +00001346 def _test_link(self, file1, file2):
1347 with open(file1, "w") as f1:
1348 f1.write("test")
1349
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001350 with warnings.catch_warnings():
1351 warnings.simplefilter("ignore", DeprecationWarning)
1352 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001353 with open(file1, "r") as f1, open(file2, "r") as f2:
1354 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1355
1356 def test_link(self):
1357 self._test_link(self.file1, self.file2)
1358
1359 def test_link_bytes(self):
1360 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1361 bytes(self.file2, sys.getfilesystemencoding()))
1362
Brian Curtinf498b752010-11-30 15:54:04 +00001363 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001364 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001365 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001366 except UnicodeError:
1367 raise unittest.SkipTest("Unable to encode for this platform.")
1368
Brian Curtinf498b752010-11-30 15:54:04 +00001369 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001370 self.file2 = self.file1 + "2"
1371 self._test_link(self.file1, self.file2)
1372
Serhiy Storchaka43767632013-11-03 21:31:38 +02001373@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1374class PosixUidGidTests(unittest.TestCase):
1375 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1376 def test_setuid(self):
1377 if os.getuid() != 0:
1378 self.assertRaises(OSError, os.setuid, 0)
1379 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001380
Serhiy Storchaka43767632013-11-03 21:31:38 +02001381 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1382 def test_setgid(self):
1383 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1384 self.assertRaises(OSError, os.setgid, 0)
1385 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001386
Serhiy Storchaka43767632013-11-03 21:31:38 +02001387 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1388 def test_seteuid(self):
1389 if os.getuid() != 0:
1390 self.assertRaises(OSError, os.seteuid, 0)
1391 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001392
Serhiy Storchaka43767632013-11-03 21:31:38 +02001393 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1394 def test_setegid(self):
1395 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1396 self.assertRaises(OSError, os.setegid, 0)
1397 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001398
Serhiy Storchaka43767632013-11-03 21:31:38 +02001399 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1400 def test_setreuid(self):
1401 if os.getuid() != 0:
1402 self.assertRaises(OSError, os.setreuid, 0, 0)
1403 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1404 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001405
Serhiy Storchaka43767632013-11-03 21:31:38 +02001406 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1407 def test_setreuid_neg1(self):
1408 # Needs to accept -1. We run this in a subprocess to avoid
1409 # altering the test runner's process state (issue8045).
1410 subprocess.check_call([
1411 sys.executable, '-c',
1412 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001413
Serhiy Storchaka43767632013-11-03 21:31:38 +02001414 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1415 def test_setregid(self):
1416 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1417 self.assertRaises(OSError, os.setregid, 0, 0)
1418 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1419 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001420
Serhiy Storchaka43767632013-11-03 21:31:38 +02001421 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1422 def test_setregid_neg1(self):
1423 # Needs to accept -1. We run this in a subprocess to avoid
1424 # altering the test runner's process state (issue8045).
1425 subprocess.check_call([
1426 sys.executable, '-c',
1427 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001428
Serhiy Storchaka43767632013-11-03 21:31:38 +02001429@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1430class Pep383Tests(unittest.TestCase):
1431 def setUp(self):
1432 if support.TESTFN_UNENCODABLE:
1433 self.dir = support.TESTFN_UNENCODABLE
1434 elif support.TESTFN_NONASCII:
1435 self.dir = support.TESTFN_NONASCII
1436 else:
1437 self.dir = support.TESTFN
1438 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001439
Serhiy Storchaka43767632013-11-03 21:31:38 +02001440 bytesfn = []
1441 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001442 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001443 fn = os.fsencode(fn)
1444 except UnicodeEncodeError:
1445 return
1446 bytesfn.append(fn)
1447 add_filename(support.TESTFN_UNICODE)
1448 if support.TESTFN_UNENCODABLE:
1449 add_filename(support.TESTFN_UNENCODABLE)
1450 if support.TESTFN_NONASCII:
1451 add_filename(support.TESTFN_NONASCII)
1452 if not bytesfn:
1453 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001454
Serhiy Storchaka43767632013-11-03 21:31:38 +02001455 self.unicodefn = set()
1456 os.mkdir(self.dir)
1457 try:
1458 for fn in bytesfn:
1459 support.create_empty_file(os.path.join(self.bdir, fn))
1460 fn = os.fsdecode(fn)
1461 if fn in self.unicodefn:
1462 raise ValueError("duplicate filename")
1463 self.unicodefn.add(fn)
1464 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001465 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001466 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001467
Serhiy Storchaka43767632013-11-03 21:31:38 +02001468 def tearDown(self):
1469 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001470
Serhiy Storchaka43767632013-11-03 21:31:38 +02001471 def test_listdir(self):
1472 expected = self.unicodefn
1473 found = set(os.listdir(self.dir))
1474 self.assertEqual(found, expected)
1475 # test listdir without arguments
1476 current_directory = os.getcwd()
1477 try:
1478 os.chdir(os.sep)
1479 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1480 finally:
1481 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001482
Serhiy Storchaka43767632013-11-03 21:31:38 +02001483 def test_open(self):
1484 for fn in self.unicodefn:
1485 f = open(os.path.join(self.dir, fn), 'rb')
1486 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001487
Serhiy Storchaka43767632013-11-03 21:31:38 +02001488 @unittest.skipUnless(hasattr(os, 'statvfs'),
1489 "need os.statvfs()")
1490 def test_statvfs(self):
1491 # issue #9645
1492 for fn in self.unicodefn:
1493 # should not fail with file not found error
1494 fullname = os.path.join(self.dir, fn)
1495 os.statvfs(fullname)
1496
1497 def test_stat(self):
1498 for fn in self.unicodefn:
1499 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001500
Brian Curtineb24d742010-04-12 17:16:38 +00001501@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1502class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001503 def _kill(self, sig):
1504 # Start sys.executable as a subprocess and communicate from the
1505 # subprocess to the parent that the interpreter is ready. When it
1506 # becomes ready, send *sig* via os.kill to the subprocess and check
1507 # that the return code is equal to *sig*.
1508 import ctypes
1509 from ctypes import wintypes
1510 import msvcrt
1511
1512 # Since we can't access the contents of the process' stdout until the
1513 # process has exited, use PeekNamedPipe to see what's inside stdout
1514 # without waiting. This is done so we can tell that the interpreter
1515 # is started and running at a point where it could handle a signal.
1516 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1517 PeekNamedPipe.restype = wintypes.BOOL
1518 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1519 ctypes.POINTER(ctypes.c_char), # stdout buf
1520 wintypes.DWORD, # Buffer size
1521 ctypes.POINTER(wintypes.DWORD), # bytes read
1522 ctypes.POINTER(wintypes.DWORD), # bytes avail
1523 ctypes.POINTER(wintypes.DWORD)) # bytes left
1524 msg = "running"
1525 proc = subprocess.Popen([sys.executable, "-c",
1526 "import sys;"
1527 "sys.stdout.write('{}');"
1528 "sys.stdout.flush();"
1529 "input()".format(msg)],
1530 stdout=subprocess.PIPE,
1531 stderr=subprocess.PIPE,
1532 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001533 self.addCleanup(proc.stdout.close)
1534 self.addCleanup(proc.stderr.close)
1535 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001536
1537 count, max = 0, 100
1538 while count < max and proc.poll() is None:
1539 # Create a string buffer to store the result of stdout from the pipe
1540 buf = ctypes.create_string_buffer(len(msg))
1541 # Obtain the text currently in proc.stdout
1542 # Bytes read/avail/left are left as NULL and unused
1543 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1544 buf, ctypes.sizeof(buf), None, None, None)
1545 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1546 if buf.value:
1547 self.assertEqual(msg, buf.value.decode())
1548 break
1549 time.sleep(0.1)
1550 count += 1
1551 else:
1552 self.fail("Did not receive communication from the subprocess")
1553
Brian Curtineb24d742010-04-12 17:16:38 +00001554 os.kill(proc.pid, sig)
1555 self.assertEqual(proc.wait(), sig)
1556
1557 def test_kill_sigterm(self):
1558 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001559 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001560
1561 def test_kill_int(self):
1562 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001563 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001564
1565 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001566 tagname = "test_os_%s" % uuid.uuid1()
1567 m = mmap.mmap(-1, 1, tagname)
1568 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001569 # Run a script which has console control handling enabled.
1570 proc = subprocess.Popen([sys.executable,
1571 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001572 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001573 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1574 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001575 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001576 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001577 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001578 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001579 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001580 count += 1
1581 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001582 # Forcefully kill the process if we weren't able to signal it.
1583 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001584 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001585 os.kill(proc.pid, event)
1586 # proc.send_signal(event) could also be done here.
1587 # Allow time for the signal to be passed and the process to exit.
1588 time.sleep(0.5)
1589 if not proc.poll():
1590 # Forcefully kill the process if we weren't able to signal it.
1591 os.kill(proc.pid, signal.SIGINT)
1592 self.fail("subprocess did not stop on {}".format(name))
1593
1594 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1595 def test_CTRL_C_EVENT(self):
1596 from ctypes import wintypes
1597 import ctypes
1598
1599 # Make a NULL value by creating a pointer with no argument.
1600 NULL = ctypes.POINTER(ctypes.c_int)()
1601 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1602 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1603 wintypes.BOOL)
1604 SetConsoleCtrlHandler.restype = wintypes.BOOL
1605
1606 # Calling this with NULL and FALSE causes the calling process to
1607 # handle CTRL+C, rather than ignore it. This property is inherited
1608 # by subprocesses.
1609 SetConsoleCtrlHandler(NULL, 0)
1610
1611 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1612
1613 def test_CTRL_BREAK_EVENT(self):
1614 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1615
1616
Brian Curtind40e6f72010-07-08 21:39:08 +00001617@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001618class Win32ListdirTests(unittest.TestCase):
1619 """Test listdir on Windows."""
1620
1621 def setUp(self):
1622 self.created_paths = []
1623 for i in range(2):
1624 dir_name = 'SUB%d' % i
1625 dir_path = os.path.join(support.TESTFN, dir_name)
1626 file_name = 'FILE%d' % i
1627 file_path = os.path.join(support.TESTFN, file_name)
1628 os.makedirs(dir_path)
1629 with open(file_path, 'w') as f:
1630 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1631 self.created_paths.extend([dir_name, file_name])
1632 self.created_paths.sort()
1633
1634 def tearDown(self):
1635 shutil.rmtree(support.TESTFN)
1636
1637 def test_listdir_no_extended_path(self):
1638 """Test when the path is not an "extended" path."""
1639 # unicode
1640 self.assertEqual(
1641 sorted(os.listdir(support.TESTFN)),
1642 self.created_paths)
1643 # bytes
1644 self.assertEqual(
1645 sorted(os.listdir(os.fsencode(support.TESTFN))),
1646 [os.fsencode(path) for path in self.created_paths])
1647
1648 def test_listdir_extended_path(self):
1649 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001650 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001651 # unicode
1652 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1653 self.assertEqual(
1654 sorted(os.listdir(path)),
1655 self.created_paths)
1656 # bytes
1657 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1658 self.assertEqual(
1659 sorted(os.listdir(path)),
1660 [os.fsencode(path) for path in self.created_paths])
1661
1662
1663@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001664@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001665class Win32SymlinkTests(unittest.TestCase):
1666 filelink = 'filelinktest'
1667 filelink_target = os.path.abspath(__file__)
1668 dirlink = 'dirlinktest'
1669 dirlink_target = os.path.dirname(filelink_target)
1670 missing_link = 'missing link'
1671
1672 def setUp(self):
1673 assert os.path.exists(self.dirlink_target)
1674 assert os.path.exists(self.filelink_target)
1675 assert not os.path.exists(self.dirlink)
1676 assert not os.path.exists(self.filelink)
1677 assert not os.path.exists(self.missing_link)
1678
1679 def tearDown(self):
1680 if os.path.exists(self.filelink):
1681 os.remove(self.filelink)
1682 if os.path.exists(self.dirlink):
1683 os.rmdir(self.dirlink)
1684 if os.path.lexists(self.missing_link):
1685 os.remove(self.missing_link)
1686
1687 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001688 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001689 self.assertTrue(os.path.exists(self.dirlink))
1690 self.assertTrue(os.path.isdir(self.dirlink))
1691 self.assertTrue(os.path.islink(self.dirlink))
1692 self.check_stat(self.dirlink, self.dirlink_target)
1693
1694 def test_file_link(self):
1695 os.symlink(self.filelink_target, self.filelink)
1696 self.assertTrue(os.path.exists(self.filelink))
1697 self.assertTrue(os.path.isfile(self.filelink))
1698 self.assertTrue(os.path.islink(self.filelink))
1699 self.check_stat(self.filelink, self.filelink_target)
1700
1701 def _create_missing_dir_link(self):
1702 'Create a "directory" link to a non-existent target'
1703 linkname = self.missing_link
1704 if os.path.lexists(linkname):
1705 os.remove(linkname)
1706 target = r'c:\\target does not exist.29r3c740'
1707 assert not os.path.exists(target)
1708 target_is_dir = True
1709 os.symlink(target, linkname, target_is_dir)
1710
1711 def test_remove_directory_link_to_missing_target(self):
1712 self._create_missing_dir_link()
1713 # For compatibility with Unix, os.remove will check the
1714 # directory status and call RemoveDirectory if the symlink
1715 # was created with target_is_dir==True.
1716 os.remove(self.missing_link)
1717
1718 @unittest.skip("currently fails; consider for improvement")
1719 def test_isdir_on_directory_link_to_missing_target(self):
1720 self._create_missing_dir_link()
1721 # consider having isdir return true for directory links
1722 self.assertTrue(os.path.isdir(self.missing_link))
1723
1724 @unittest.skip("currently fails; consider for improvement")
1725 def test_rmdir_on_directory_link_to_missing_target(self):
1726 self._create_missing_dir_link()
1727 # consider allowing rmdir to remove directory links
1728 os.rmdir(self.missing_link)
1729
1730 def check_stat(self, link, target):
1731 self.assertEqual(os.stat(link), os.stat(target))
1732 self.assertNotEqual(os.lstat(link), os.stat(link))
1733
Brian Curtind25aef52011-06-13 15:16:04 -05001734 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001735 with warnings.catch_warnings():
1736 warnings.simplefilter("ignore", DeprecationWarning)
1737 self.assertEqual(os.stat(bytes_link), os.stat(target))
1738 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001739
1740 def test_12084(self):
1741 level1 = os.path.abspath(support.TESTFN)
1742 level2 = os.path.join(level1, "level2")
1743 level3 = os.path.join(level2, "level3")
1744 try:
1745 os.mkdir(level1)
1746 os.mkdir(level2)
1747 os.mkdir(level3)
1748
1749 file1 = os.path.abspath(os.path.join(level1, "file1"))
1750
1751 with open(file1, "w") as f:
1752 f.write("file1")
1753
1754 orig_dir = os.getcwd()
1755 try:
1756 os.chdir(level2)
1757 link = os.path.join(level2, "link")
1758 os.symlink(os.path.relpath(file1), "link")
1759 self.assertIn("link", os.listdir(os.getcwd()))
1760
1761 # Check os.stat calls from the same dir as the link
1762 self.assertEqual(os.stat(file1), os.stat("link"))
1763
1764 # Check os.stat calls from a dir below the link
1765 os.chdir(level1)
1766 self.assertEqual(os.stat(file1),
1767 os.stat(os.path.relpath(link)))
1768
1769 # Check os.stat calls from a dir above the link
1770 os.chdir(level3)
1771 self.assertEqual(os.stat(file1),
1772 os.stat(os.path.relpath(link)))
1773 finally:
1774 os.chdir(orig_dir)
1775 except OSError as err:
1776 self.fail(err)
1777 finally:
1778 os.remove(file1)
1779 shutil.rmtree(level1)
1780
Brian Curtind40e6f72010-07-08 21:39:08 +00001781
Jason R. Coombs3a092862013-05-27 23:21:28 -04001782@support.skip_unless_symlink
1783class NonLocalSymlinkTests(unittest.TestCase):
1784
1785 def setUp(self):
1786 """
1787 Create this structure:
1788
1789 base
1790 \___ some_dir
1791 """
1792 os.makedirs('base/some_dir')
1793
1794 def tearDown(self):
1795 shutil.rmtree('base')
1796
1797 def test_directory_link_nonlocal(self):
1798 """
1799 The symlink target should resolve relative to the link, not relative
1800 to the current directory.
1801
1802 Then, link base/some_link -> base/some_dir and ensure that some_link
1803 is resolved as a directory.
1804
1805 In issue13772, it was discovered that directory detection failed if
1806 the symlink target was not specified relative to the current
1807 directory, which was a defect in the implementation.
1808 """
1809 src = os.path.join('base', 'some_link')
1810 os.symlink('some_dir', src)
1811 assert os.path.isdir(src)
1812
1813
Victor Stinnere8d51452010-08-19 01:05:19 +00001814class FSEncodingTests(unittest.TestCase):
1815 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001816 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1817 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001818
Victor Stinnere8d51452010-08-19 01:05:19 +00001819 def test_identity(self):
1820 # assert fsdecode(fsencode(x)) == x
1821 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1822 try:
1823 bytesfn = os.fsencode(fn)
1824 except UnicodeEncodeError:
1825 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001826 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001827
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001828
Brett Cannonefb00c02012-02-29 18:31:31 -05001829
1830class DeviceEncodingTests(unittest.TestCase):
1831
1832 def test_bad_fd(self):
1833 # Return None when an fd doesn't actually exist.
1834 self.assertIsNone(os.device_encoding(123456))
1835
Philip Jenveye308b7c2012-02-29 16:16:15 -08001836 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1837 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001838 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001839 def test_device_encoding(self):
1840 encoding = os.device_encoding(0)
1841 self.assertIsNotNone(encoding)
1842 self.assertTrue(codecs.lookup(encoding))
1843
1844
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001845class PidTests(unittest.TestCase):
1846 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1847 def test_getppid(self):
1848 p = subprocess.Popen([sys.executable, '-c',
1849 'import os; print(os.getppid())'],
1850 stdout=subprocess.PIPE)
1851 stdout, _ = p.communicate()
1852 # We are the parent of our subprocess
1853 self.assertEqual(int(stdout), os.getpid())
1854
1855
Brian Curtin0151b8e2010-09-24 13:43:43 +00001856# The introduction of this TestCase caused at least two different errors on
1857# *nix buildbots. Temporarily skip this to let the buildbots move along.
1858@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001859@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1860class LoginTests(unittest.TestCase):
1861 def test_getlogin(self):
1862 user_name = os.getlogin()
1863 self.assertNotEqual(len(user_name), 0)
1864
1865
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001866@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1867 "needs os.getpriority and os.setpriority")
1868class ProgramPriorityTests(unittest.TestCase):
1869 """Tests for os.getpriority() and os.setpriority()."""
1870
1871 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001872
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001873 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1874 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1875 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001876 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1877 if base >= 19 and new_prio <= 19:
1878 raise unittest.SkipTest(
1879 "unable to reliably test setpriority at current nice level of %s" % base)
1880 else:
1881 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001882 finally:
1883 try:
1884 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1885 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001886 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001887 raise
1888
1889
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001890if threading is not None:
1891 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001892
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001893 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001894
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001895 def __init__(self, conn):
1896 asynchat.async_chat.__init__(self, conn)
1897 self.in_buffer = []
1898 self.closed = False
1899 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001900
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001901 def handle_read(self):
1902 data = self.recv(4096)
1903 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001904
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001905 def get_data(self):
1906 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001907
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001908 def handle_close(self):
1909 self.close()
1910 self.closed = True
1911
1912 def handle_error(self):
1913 raise
1914
1915 def __init__(self, address):
1916 threading.Thread.__init__(self)
1917 asyncore.dispatcher.__init__(self)
1918 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1919 self.bind(address)
1920 self.listen(5)
1921 self.host, self.port = self.socket.getsockname()[:2]
1922 self.handler_instance = None
1923 self._active = False
1924 self._active_lock = threading.Lock()
1925
1926 # --- public API
1927
1928 @property
1929 def running(self):
1930 return self._active
1931
1932 def start(self):
1933 assert not self.running
1934 self.__flag = threading.Event()
1935 threading.Thread.start(self)
1936 self.__flag.wait()
1937
1938 def stop(self):
1939 assert self.running
1940 self._active = False
1941 self.join()
1942
1943 def wait(self):
1944 # wait for handler connection to be closed, then stop the server
1945 while not getattr(self.handler_instance, "closed", False):
1946 time.sleep(0.001)
1947 self.stop()
1948
1949 # --- internals
1950
1951 def run(self):
1952 self._active = True
1953 self.__flag.set()
1954 while self._active and asyncore.socket_map:
1955 self._active_lock.acquire()
1956 asyncore.loop(timeout=0.001, count=1)
1957 self._active_lock.release()
1958 asyncore.close_all()
1959
1960 def handle_accept(self):
1961 conn, addr = self.accept()
1962 self.handler_instance = self.Handler(conn)
1963
1964 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001965 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001966 handle_read = handle_connect
1967
1968 def writable(self):
1969 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001970
1971 def handle_error(self):
1972 raise
1973
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001974
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001975@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001976@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1977class TestSendfile(unittest.TestCase):
1978
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001979 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001980 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001981 not sys.platform.startswith("solaris") and \
1982 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02001983 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
1984 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001985
1986 @classmethod
1987 def setUpClass(cls):
1988 with open(support.TESTFN, "wb") as f:
1989 f.write(cls.DATA)
1990
1991 @classmethod
1992 def tearDownClass(cls):
1993 support.unlink(support.TESTFN)
1994
1995 def setUp(self):
1996 self.server = SendfileTestServer((support.HOST, 0))
1997 self.server.start()
1998 self.client = socket.socket()
1999 self.client.connect((self.server.host, self.server.port))
2000 self.client.settimeout(1)
2001 # synchronize by waiting for "220 ready" response
2002 self.client.recv(1024)
2003 self.sockno = self.client.fileno()
2004 self.file = open(support.TESTFN, 'rb')
2005 self.fileno = self.file.fileno()
2006
2007 def tearDown(self):
2008 self.file.close()
2009 self.client.close()
2010 if self.server.running:
2011 self.server.stop()
2012
2013 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2014 """A higher level wrapper representing how an application is
2015 supposed to use sendfile().
2016 """
2017 while 1:
2018 try:
2019 if self.SUPPORT_HEADERS_TRAILERS:
2020 return os.sendfile(sock, file, offset, nbytes, headers,
2021 trailers)
2022 else:
2023 return os.sendfile(sock, file, offset, nbytes)
2024 except OSError as err:
2025 if err.errno == errno.ECONNRESET:
2026 # disconnected
2027 raise
2028 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2029 # we have to retry send data
2030 continue
2031 else:
2032 raise
2033
2034 def test_send_whole_file(self):
2035 # normal send
2036 total_sent = 0
2037 offset = 0
2038 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002039 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002040 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2041 if sent == 0:
2042 break
2043 offset += sent
2044 total_sent += sent
2045 self.assertTrue(sent <= nbytes)
2046 self.assertEqual(offset, total_sent)
2047
2048 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002049 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002050 self.client.close()
2051 self.server.wait()
2052 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002053 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002054 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002055
2056 def test_send_at_certain_offset(self):
2057 # start sending a file at a certain offset
2058 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002059 offset = len(self.DATA) // 2
2060 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002061 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002062 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002063 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2064 if sent == 0:
2065 break
2066 offset += sent
2067 total_sent += sent
2068 self.assertTrue(sent <= nbytes)
2069
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002070 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002071 self.client.close()
2072 self.server.wait()
2073 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002074 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002075 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002076 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002077 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002078
2079 def test_offset_overflow(self):
2080 # specify an offset > file size
2081 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002082 try:
2083 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2084 except OSError as e:
2085 # Solaris can raise EINVAL if offset >= file length, ignore.
2086 if e.errno != errno.EINVAL:
2087 raise
2088 else:
2089 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002090 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002091 self.client.close()
2092 self.server.wait()
2093 data = self.server.handler_instance.get_data()
2094 self.assertEqual(data, b'')
2095
2096 def test_invalid_offset(self):
2097 with self.assertRaises(OSError) as cm:
2098 os.sendfile(self.sockno, self.fileno, -1, 4096)
2099 self.assertEqual(cm.exception.errno, errno.EINVAL)
2100
2101 # --- headers / trailers tests
2102
Serhiy Storchaka43767632013-11-03 21:31:38 +02002103 @requires_headers_trailers
2104 def test_headers(self):
2105 total_sent = 0
2106 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2107 headers=[b"x" * 512])
2108 total_sent += sent
2109 offset = 4096
2110 nbytes = 4096
2111 while 1:
2112 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2113 offset, nbytes)
2114 if sent == 0:
2115 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002116 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002117 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002118
Serhiy Storchaka43767632013-11-03 21:31:38 +02002119 expected_data = b"x" * 512 + self.DATA
2120 self.assertEqual(total_sent, len(expected_data))
2121 self.client.close()
2122 self.server.wait()
2123 data = self.server.handler_instance.get_data()
2124 self.assertEqual(hash(data), hash(expected_data))
2125
2126 @requires_headers_trailers
2127 def test_trailers(self):
2128 TESTFN2 = support.TESTFN + "2"
2129 file_data = b"abcdef"
2130 with open(TESTFN2, 'wb') as f:
2131 f.write(file_data)
2132 with open(TESTFN2, 'rb')as f:
2133 self.addCleanup(os.remove, TESTFN2)
2134 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2135 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002136 self.client.close()
2137 self.server.wait()
2138 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002139 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002140
Serhiy Storchaka43767632013-11-03 21:31:38 +02002141 @requires_headers_trailers
2142 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2143 'test needs os.SF_NODISKIO')
2144 def test_flags(self):
2145 try:
2146 os.sendfile(self.sockno, self.fileno, 0, 4096,
2147 flags=os.SF_NODISKIO)
2148 except OSError as err:
2149 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2150 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002151
2152
Larry Hastings9cf065c2012-06-22 16:30:09 -07002153def supports_extended_attributes():
2154 if not hasattr(os, "setxattr"):
2155 return False
2156 try:
2157 with open(support.TESTFN, "wb") as fp:
2158 try:
2159 os.setxattr(fp.fileno(), b"user.test", b"")
2160 except OSError:
2161 return False
2162 finally:
2163 support.unlink(support.TESTFN)
2164 # Kernels < 2.6.39 don't respect setxattr flags.
2165 kernel_version = platform.release()
2166 m = re.match("2.6.(\d{1,2})", kernel_version)
2167 return m is None or int(m.group(1)) >= 39
2168
2169
2170@unittest.skipUnless(supports_extended_attributes(),
2171 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002172class ExtendedAttributeTests(unittest.TestCase):
2173
2174 def tearDown(self):
2175 support.unlink(support.TESTFN)
2176
Larry Hastings9cf065c2012-06-22 16:30:09 -07002177 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002178 fn = support.TESTFN
2179 open(fn, "wb").close()
2180 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002181 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002182 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002183 init_xattr = listxattr(fn)
2184 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002185 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002186 xattr = set(init_xattr)
2187 xattr.add("user.test")
2188 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002189 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2190 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2191 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002192 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002193 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002194 self.assertEqual(cm.exception.errno, errno.EEXIST)
2195 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002196 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002197 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002198 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002199 xattr.add("user.test2")
2200 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002201 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002202 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002203 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002204 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002205 xattr.remove("user.test")
2206 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002207 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2208 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2209 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2210 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002211 many = sorted("user.test{}".format(i) for i in range(100))
2212 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002213 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002214 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002215
Larry Hastings9cf065c2012-06-22 16:30:09 -07002216 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002217 def make_bytes(s):
2218 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002219 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002220 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002221 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002222
2223 def test_simple(self):
2224 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2225 os.listxattr)
2226
2227 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002228 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2229 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002230
2231 def test_fds(self):
2232 def getxattr(path, *args):
2233 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002234 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002235 def setxattr(path, *args):
2236 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002237 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002238 def removexattr(path, *args):
2239 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002240 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002241 def listxattr(path, *args):
2242 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002243 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002244 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2245
2246
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002247@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2248class Win32DeprecatedBytesAPI(unittest.TestCase):
2249 def test_deprecated(self):
2250 import nt
2251 filename = os.fsencode(support.TESTFN)
2252 with warnings.catch_warnings():
2253 warnings.simplefilter("error", DeprecationWarning)
2254 for func, *args in (
2255 (nt._getfullpathname, filename),
2256 (nt._isdir, filename),
2257 (os.access, filename, os.R_OK),
2258 (os.chdir, filename),
2259 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002260 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002261 (os.link, filename, filename),
2262 (os.listdir, filename),
2263 (os.lstat, filename),
2264 (os.mkdir, filename),
2265 (os.open, filename, os.O_RDONLY),
2266 (os.rename, filename, filename),
2267 (os.rmdir, filename),
2268 (os.startfile, filename),
2269 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002270 (os.unlink, filename),
2271 (os.utime, filename),
2272 ):
2273 self.assertRaises(DeprecationWarning, func, *args)
2274
Victor Stinner28216442011-11-16 00:34:44 +01002275 @support.skip_unless_symlink
2276 def test_symlink(self):
2277 filename = os.fsencode(support.TESTFN)
2278 with warnings.catch_warnings():
2279 warnings.simplefilter("error", DeprecationWarning)
2280 self.assertRaises(DeprecationWarning,
2281 os.symlink, filename, filename)
2282
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002283
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002284@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2285class TermsizeTests(unittest.TestCase):
2286 def test_does_not_crash(self):
2287 """Check if get_terminal_size() returns a meaningful value.
2288
2289 There's no easy portable way to actually check the size of the
2290 terminal, so let's check if it returns something sensible instead.
2291 """
2292 try:
2293 size = os.get_terminal_size()
2294 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002295 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002296 # Under win32 a generic OSError can be thrown if the
2297 # handle cannot be retrieved
2298 self.skipTest("failed to query terminal size")
2299 raise
2300
Antoine Pitroucfade362012-02-08 23:48:59 +01002301 self.assertGreaterEqual(size.columns, 0)
2302 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002303
2304 def test_stty_match(self):
2305 """Check if stty returns the same results
2306
2307 stty actually tests stdin, so get_terminal_size is invoked on
2308 stdin explicitly. If stty succeeded, then get_terminal_size()
2309 should work too.
2310 """
2311 try:
2312 size = subprocess.check_output(['stty', 'size']).decode().split()
2313 except (FileNotFoundError, subprocess.CalledProcessError):
2314 self.skipTest("stty invocation failed")
2315 expected = (int(size[1]), int(size[0])) # reversed order
2316
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002317 try:
2318 actual = os.get_terminal_size(sys.__stdin__.fileno())
2319 except OSError as e:
2320 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2321 # Under win32 a generic OSError can be thrown if the
2322 # handle cannot be retrieved
2323 self.skipTest("failed to query terminal size")
2324 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002325 self.assertEqual(expected, actual)
2326
2327
Victor Stinner292c8352012-10-30 02:17:38 +01002328class OSErrorTests(unittest.TestCase):
2329 def setUp(self):
2330 class Str(str):
2331 pass
2332
Victor Stinnerafe17062012-10-31 22:47:43 +01002333 self.bytes_filenames = []
2334 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002335 if support.TESTFN_UNENCODABLE is not None:
2336 decoded = support.TESTFN_UNENCODABLE
2337 else:
2338 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002339 self.unicode_filenames.append(decoded)
2340 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002341 if support.TESTFN_UNDECODABLE is not None:
2342 encoded = support.TESTFN_UNDECODABLE
2343 else:
2344 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002345 self.bytes_filenames.append(encoded)
2346 self.bytes_filenames.append(memoryview(encoded))
2347
2348 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002349
2350 def test_oserror_filename(self):
2351 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002352 (self.filenames, os.chdir,),
2353 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002354 (self.filenames, os.lstat,),
2355 (self.filenames, os.open, os.O_RDONLY),
2356 (self.filenames, os.rmdir,),
2357 (self.filenames, os.stat,),
2358 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002359 ]
2360 if sys.platform == "win32":
2361 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002362 (self.bytes_filenames, os.rename, b"dst"),
2363 (self.bytes_filenames, os.replace, b"dst"),
2364 (self.unicode_filenames, os.rename, "dst"),
2365 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002366 # Issue #16414: Don't test undecodable names with listdir()
2367 # because of a Windows bug.
2368 #
2369 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2370 # empty list (instead of failing), whereas os.listdir(b'\xff')
2371 # raises a FileNotFoundError. It looks like a Windows bug:
2372 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2373 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2374 # ERROR_PATH_NOT_FOUND (3).
2375 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002376 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002377 else:
2378 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002379 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002380 (self.filenames, os.rename, "dst"),
2381 (self.filenames, os.replace, "dst"),
2382 ))
2383 if hasattr(os, "chown"):
2384 funcs.append((self.filenames, os.chown, 0, 0))
2385 if hasattr(os, "lchown"):
2386 funcs.append((self.filenames, os.lchown, 0, 0))
2387 if hasattr(os, "truncate"):
2388 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002389 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002390 funcs.append((self.filenames, os.chflags, 0))
2391 if hasattr(os, "lchflags"):
2392 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002393 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002394 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002395 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002396 if sys.platform == "win32":
2397 funcs.append((self.bytes_filenames, os.link, b"dst"))
2398 funcs.append((self.unicode_filenames, os.link, "dst"))
2399 else:
2400 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002401 if hasattr(os, "listxattr"):
2402 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002403 (self.filenames, os.listxattr,),
2404 (self.filenames, os.getxattr, "user.test"),
2405 (self.filenames, os.setxattr, "user.test", b'user'),
2406 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002407 ))
2408 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002409 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002410 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002411 if sys.platform == "win32":
2412 funcs.append((self.unicode_filenames, os.readlink,))
2413 else:
2414 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002415
Victor Stinnerafe17062012-10-31 22:47:43 +01002416 for filenames, func, *func_args in funcs:
2417 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002418 try:
2419 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002420 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002421 self.assertIs(err.filename, name)
2422 else:
2423 self.fail("No exception thrown by {}".format(func))
2424
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002425class CPUCountTests(unittest.TestCase):
2426 def test_cpu_count(self):
2427 cpus = os.cpu_count()
2428 if cpus is not None:
2429 self.assertIsInstance(cpus, int)
2430 self.assertGreater(cpus, 0)
2431 else:
2432 self.skipTest("Could not determine the number of CPUs")
2433
Victor Stinnerdaf45552013-08-28 00:53:59 +02002434
2435class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002436 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002437 fd = os.open(__file__, os.O_RDONLY)
2438 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002439 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002440
Victor Stinnerdaf45552013-08-28 00:53:59 +02002441 os.set_inheritable(fd, True)
2442 self.assertEqual(os.get_inheritable(fd), True)
2443
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002444 @unittest.skipIf(fcntl is None, "need fcntl")
2445 def test_get_inheritable_cloexec(self):
2446 fd = os.open(__file__, os.O_RDONLY)
2447 self.addCleanup(os.close, fd)
2448 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002449
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002450 # clear FD_CLOEXEC flag
2451 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2452 flags &= ~fcntl.FD_CLOEXEC
2453 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002454
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002455 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002456
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002457 @unittest.skipIf(fcntl is None, "need fcntl")
2458 def test_set_inheritable_cloexec(self):
2459 fd = os.open(__file__, os.O_RDONLY)
2460 self.addCleanup(os.close, fd)
2461 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2462 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002463
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002464 os.set_inheritable(fd, True)
2465 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2466 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002467
Victor Stinnerdaf45552013-08-28 00:53:59 +02002468 def test_open(self):
2469 fd = os.open(__file__, os.O_RDONLY)
2470 self.addCleanup(os.close, fd)
2471 self.assertEqual(os.get_inheritable(fd), False)
2472
2473 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2474 def test_pipe(self):
2475 rfd, wfd = os.pipe()
2476 self.addCleanup(os.close, rfd)
2477 self.addCleanup(os.close, wfd)
2478 self.assertEqual(os.get_inheritable(rfd), False)
2479 self.assertEqual(os.get_inheritable(wfd), False)
2480
2481 def test_dup(self):
2482 fd1 = os.open(__file__, os.O_RDONLY)
2483 self.addCleanup(os.close, fd1)
2484
2485 fd2 = os.dup(fd1)
2486 self.addCleanup(os.close, fd2)
2487 self.assertEqual(os.get_inheritable(fd2), False)
2488
2489 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2490 def test_dup2(self):
2491 fd = os.open(__file__, os.O_RDONLY)
2492 self.addCleanup(os.close, fd)
2493
2494 # inheritable by default
2495 fd2 = os.open(__file__, os.O_RDONLY)
2496 try:
2497 os.dup2(fd, fd2)
2498 self.assertEqual(os.get_inheritable(fd2), True)
2499 finally:
2500 os.close(fd2)
2501
2502 # force non-inheritable
2503 fd3 = os.open(__file__, os.O_RDONLY)
2504 try:
2505 os.dup2(fd, fd3, inheritable=False)
2506 self.assertEqual(os.get_inheritable(fd3), False)
2507 finally:
2508 os.close(fd3)
2509
2510 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2511 def test_openpty(self):
2512 master_fd, slave_fd = os.openpty()
2513 self.addCleanup(os.close, master_fd)
2514 self.addCleanup(os.close, slave_fd)
2515 self.assertEqual(os.get_inheritable(master_fd), False)
2516 self.assertEqual(os.get_inheritable(slave_fd), False)
2517
2518
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002519@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002520def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002521 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002522 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002523 StatAttributeTests,
2524 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002525 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002526 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002527 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002528 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002529 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002530 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002531 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002532 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002533 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002534 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002535 Win32KillTests,
Tim Golden781bbeb2013-10-25 20:24:06 +01002536 Win32ListdirTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002537 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002538 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002539 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002540 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002541 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002542 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002543 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002544 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002545 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002546 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002547 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002548 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002549 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002550 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002551 CPUCountTests,
Victor Stinnerdaf45552013-08-28 00:53:59 +02002552 FDInheritanceTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002553 )
Fred Drake2e2be372001-09-20 21:33:42 +00002554
2555if __name__ == "__main__":
2556 test_main()