blob: 54dd9da2a4e65bd9aea2fdb185250202476252a6 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinnere12e7aa2015-06-12 21:58:00 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinnere12e7aa2015-06-12 21:58:00 +02009import decimal
10import errno
11import fractions
12import itertools
13import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000014import mmap
Victor Stinnere12e7aa2015-06-12 21:58:00 +020015import os
16import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Victor Stinnere12e7aa2015-06-12 21:58:00 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Victor Stinnere12e7aa2015-06-12 21:58:00 +020023import subprocess
24import sys
Victor Stinnerfe02e392014-12-21 01:16:38 +010025import sysconfig
Victor Stinnere12e7aa2015-06-12 21:58:00 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020043
Georg Brandl2daf6ae2012-02-20 19:54:16 +010044from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000045
Mark Dickinson7cf03892010-04-16 13:45:35 +000046# Detect whether we're on a Linux system that uses the (now outdated
47# and unmaintained) linuxthreads threading library. There's an issue
48# when combining linuxthreads with a failed execv call: see
49# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020050if hasattr(sys, 'thread_info') and sys.thread_info.version:
51 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
52else:
53 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000054
Stefan Krahebee49a2013-01-17 15:31:00 +010055# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
56HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
57
Thomas Wouters0e3f5912006-08-11 14:57:12 +000058# Tests creating TESTFN
59class FileTests(unittest.TestCase):
60 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000061 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000062 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000063 tearDown = setUp
64
65 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000066 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000068 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069
Christian Heimesfdab48e2008-01-20 09:06:41 +000070 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000071 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
72 # We must allocate two consecutive file descriptors, otherwise
73 # it will mess up other file descriptors (perhaps even the three
74 # standard ones).
75 second = os.dup(first)
76 try:
77 retries = 0
78 while second != first + 1:
79 os.close(first)
80 retries += 1
81 if retries > 10:
82 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000083 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000084 first, second = second, os.dup(second)
85 finally:
86 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000087 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000088 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000089 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000090
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000091 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000092 def test_rename(self):
93 path = support.TESTFN
94 old = sys.getrefcount(path)
95 self.assertRaises(TypeError, os.rename, path, 0)
96 new = sys.getrefcount(path)
97 self.assertEqual(old, new)
98
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099 def test_read(self):
100 with open(support.TESTFN, "w+b") as fobj:
101 fobj.write(b"spam")
102 fobj.flush()
103 fd = fobj.fileno()
104 os.lseek(fd, 0, 0)
105 s = os.read(fd, 4)
106 self.assertEqual(type(s), bytes)
107 self.assertEqual(s, b"spam")
108
109 def test_write(self):
110 # os.write() accepts bytes- and buffer-like objects but not strings
111 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
112 self.assertRaises(TypeError, os.write, fd, "beans")
113 os.write(fd, b"bacon\n")
114 os.write(fd, bytearray(b"eggs\n"))
115 os.write(fd, memoryview(b"spam\n"))
116 os.close(fd)
117 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000118 self.assertEqual(fobj.read().splitlines(),
119 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000120
Victor Stinnere0daff12011-03-20 23:36:35 +0100121 def write_windows_console(self, *args):
122 retcode = subprocess.call(args,
123 # use a new console to not flood the test output
124 creationflags=subprocess.CREATE_NEW_CONSOLE,
125 # use a shell to hide the console window (SW_HIDE)
126 shell=True)
127 self.assertEqual(retcode, 0)
128
129 @unittest.skipUnless(sys.platform == 'win32',
130 'test specific to the Windows console')
131 def test_write_windows_console(self):
132 # Issue #11395: the Windows console returns an error (12: not enough
133 # space error) on writing into stdout if stdout mode is binary and the
134 # length is greater than 66,000 bytes (or less, depending on heap
135 # usage).
136 code = "print('x' * 100000)"
137 self.write_windows_console(sys.executable, "-c", code)
138 self.write_windows_console(sys.executable, "-u", "-c", code)
139
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000140 def fdopen_helper(self, *args):
141 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200142 f = os.fdopen(fd, *args)
143 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000144
145 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200146 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
147 os.close(fd)
148
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000149 self.fdopen_helper()
150 self.fdopen_helper('r')
151 self.fdopen_helper('r', 100)
152
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100153 def test_replace(self):
154 TESTFN2 = support.TESTFN + ".2"
155 with open(support.TESTFN, 'w') as f:
156 f.write("1")
157 with open(TESTFN2, 'w') as f:
158 f.write("2")
159 self.addCleanup(os.unlink, TESTFN2)
160 os.replace(support.TESTFN, TESTFN2)
161 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
162 with open(TESTFN2, 'r') as f:
163 self.assertEqual(f.read(), "1")
164
Martin Panterbf19d162015-09-09 01:01:13 +0000165 def test_open_keywords(self):
166 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
167 dir_fd=None)
168 os.close(f)
169
170 def test_symlink_keywords(self):
171 symlink = support.get_attribute(os, "symlink")
172 try:
173 symlink(src='target', dst=support.TESTFN,
174 target_is_directory=False, dir_fd=None)
175 except (NotImplementedError, OSError):
176 pass # No OS support or unprivileged user
177
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200178
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000179# Test attributes on return values from os.*stat* family.
180class StatAttributeTests(unittest.TestCase):
181 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000182 os.mkdir(support.TESTFN)
183 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000184 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000185 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000186 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000187
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000188 def tearDown(self):
189 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000190 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000191
Serhiy Storchaka43767632013-11-03 21:31:38 +0200192 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000193 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000194 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000195
196 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000197 self.assertEqual(result[stat.ST_SIZE], 3)
198 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000199
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000200 # Make sure all the attributes are there
201 members = dir(result)
202 for name in dir(stat):
203 if name[:3] == 'ST_':
204 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000205 if name.endswith("TIME"):
206 def trunc(x): return int(x)
207 else:
208 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000209 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000210 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000211 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000212
Larry Hastings6fe20b32012-04-19 15:07:49 -0700213 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700214 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700215 for name in 'st_atime st_mtime st_ctime'.split():
216 floaty = int(getattr(result, name) * 100000)
217 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700218 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700219
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 try:
221 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200222 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000223 except IndexError:
224 pass
225
226 # Make sure that assignment fails
227 try:
228 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200229 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000230 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000231 pass
232
233 try:
234 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200235 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000236 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000237 pass
238
239 try:
240 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200241 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 except AttributeError:
243 pass
244
245 # Use the stat_result constructor with a too-short tuple.
246 try:
247 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200248 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 except TypeError:
250 pass
251
Ezio Melotti42da6632011-03-15 05:18:48 +0200252 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000253 try:
254 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
255 except TypeError:
256 pass
257
Antoine Pitrou38425292010-09-21 18:19:07 +0000258 def test_stat_attributes(self):
259 self.check_stat_attributes(self.fname)
260
261 def test_stat_attributes_bytes(self):
262 try:
263 fname = self.fname.encode(sys.getfilesystemencoding())
264 except UnicodeEncodeError:
265 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100266 with warnings.catch_warnings():
267 warnings.simplefilter("ignore", DeprecationWarning)
268 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000269
Christian Heimes25827622013-10-12 01:27:08 +0200270 def test_stat_result_pickle(self):
271 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200272 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
273 p = pickle.dumps(result, proto)
274 self.assertIn(b'stat_result', p)
275 if proto < 4:
276 self.assertIn(b'cos\nstat_result\n', p)
277 unpickled = pickle.loads(p)
278 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200279
Serhiy Storchaka43767632013-11-03 21:31:38 +0200280 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000281 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000282 try:
283 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000284 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000285 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000286 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200287 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000288
289 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000290 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000291
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000292 # Make sure all the attributes are there.
293 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
294 'ffree', 'favail', 'flag', 'namemax')
295 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000296 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000297
298 # Make sure that assignment really fails
299 try:
300 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200301 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000302 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000303 pass
304
305 try:
306 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200307 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000308 except AttributeError:
309 pass
310
311 # Use the constructor with a too-short tuple.
312 try:
313 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200314 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000315 except TypeError:
316 pass
317
Ezio Melotti42da6632011-03-15 05:18:48 +0200318 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000319 try:
320 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
321 except TypeError:
322 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000323
Christian Heimes25827622013-10-12 01:27:08 +0200324 @unittest.skipUnless(hasattr(os, 'statvfs'),
325 "need os.statvfs()")
326 def test_statvfs_result_pickle(self):
327 try:
328 result = os.statvfs(self.fname)
329 except OSError as e:
330 # On AtheOS, glibc always returns ENOSYS
331 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200332 self.skipTest('os.statvfs() failed with ENOSYS')
333
Serhiy Storchakabad12572014-12-15 14:03:42 +0200334 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
335 p = pickle.dumps(result, proto)
336 self.assertIn(b'statvfs_result', p)
337 if proto < 4:
338 self.assertIn(b'cos\nstatvfs_result\n', p)
339 unpickled = pickle.loads(p)
340 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200341
Serhiy Storchaka43767632013-11-03 21:31:38 +0200342 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
343 def test_1686475(self):
344 # Verify that an open file can be stat'ed
345 try:
346 os.stat(r"c:\pagefile.sys")
347 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600348 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200349 except OSError as e:
350 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000351
Serhiy Storchaka43767632013-11-03 21:31:38 +0200352 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
353 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
354 def test_15261(self):
355 # Verify that stat'ing a closed fd does not cause crash
356 r, w = os.pipe()
357 try:
358 os.stat(r) # should not raise error
359 finally:
360 os.close(r)
361 os.close(w)
362 with self.assertRaises(OSError) as ctx:
363 os.stat(r)
364 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100365
Victor Stinnere12e7aa2015-06-12 21:58:00 +0200366
367class UtimeTests(unittest.TestCase):
368 def setUp(self):
369 self.dirname = support.TESTFN
370 self.fname = os.path.join(self.dirname, "f1")
371
372 self.addCleanup(support.rmtree, self.dirname)
373 os.mkdir(self.dirname)
374 with open(self.fname, 'wb') as fp:
375 fp.write(b"ABC")
376
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200377 def restore_float_times(state):
378 with warnings.catch_warnings():
379 warnings.simplefilter("ignore", DeprecationWarning)
380
381 os.stat_float_times(state)
382
Victor Stinnere12e7aa2015-06-12 21:58:00 +0200383 # ensure that st_atime and st_mtime are float
384 with warnings.catch_warnings():
385 warnings.simplefilter("ignore", DeprecationWarning)
386
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200387 old_float_times = os.stat_float_times(-1)
388 self.addCleanup(restore_float_times, old_float_times)
Victor Stinnere12e7aa2015-06-12 21:58:00 +0200389
390 os.stat_float_times(True)
391
392 def support_subsecond(self, filename):
393 # Heuristic to check if the filesystem supports timestamp with
394 # subsecond resolution: check if float and int timestamps are different
395 st = os.stat(filename)
396 return ((st.st_atime != st[7])
397 or (st.st_mtime != st[8])
398 or (st.st_ctime != st[9]))
399
400 def _test_utime(self, set_time, filename=None):
401 if not filename:
402 filename = self.fname
403
404 support_subsecond = self.support_subsecond(filename)
405 if support_subsecond:
406 # Timestamp with a resolution of 1 microsecond (10^-6).
407 #
408 # The resolution of the C internal function used by os.utime()
409 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
410 # test with a resolution of 1 ns requires more work:
411 # see the issue #15745.
412 atime_ns = 1002003000 # 1.002003 seconds
413 mtime_ns = 4005006000 # 4.005006 seconds
414 else:
415 # use a resolution of 1 second
416 atime_ns = 5 * 10**9
417 mtime_ns = 8 * 10**9
418
419 set_time(filename, (atime_ns, mtime_ns))
420 st = os.stat(filename)
421
422 if support_subsecond:
423 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
424 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
425 else:
426 self.assertEqual(st.st_atime, atime_ns * 1e-9)
427 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
428 self.assertEqual(st.st_atime_ns, atime_ns)
429 self.assertEqual(st.st_mtime_ns, mtime_ns)
430
431 def test_utime(self):
432 def set_time(filename, ns):
433 # test the ns keyword parameter
434 os.utime(filename, ns=ns)
435 self._test_utime(set_time)
436
437 @staticmethod
438 def ns_to_sec(ns):
439 # Convert a number of nanosecond (int) to a number of seconds (float).
440 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
441 # issue, os.utime() rounds towards minus infinity.
442 return (ns * 1e-9) + 0.5e-9
443
444 def test_utime_by_indexed(self):
445 # pass times as floating point seconds as the second indexed parameter
446 def set_time(filename, ns):
447 atime_ns, mtime_ns = ns
448 atime = self.ns_to_sec(atime_ns)
449 mtime = self.ns_to_sec(mtime_ns)
450 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
451 # or utime(time_t)
452 os.utime(filename, (atime, mtime))
453 self._test_utime(set_time)
454
455 def test_utime_by_times(self):
456 def set_time(filename, ns):
457 atime_ns, mtime_ns = ns
458 atime = self.ns_to_sec(atime_ns)
459 mtime = self.ns_to_sec(mtime_ns)
460 # test the times keyword parameter
461 os.utime(filename, times=(atime, mtime))
462 self._test_utime(set_time)
463
464 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
465 "follow_symlinks support for utime required "
466 "for this test.")
467 def test_utime_nofollow_symlinks(self):
468 def set_time(filename, ns):
469 # use follow_symlinks=False to test utimensat(timespec)
470 # or lutimes(timeval)
471 os.utime(filename, ns=ns, follow_symlinks=False)
472 self._test_utime(set_time)
473
474 @unittest.skipUnless(os.utime in os.supports_fd,
475 "fd support for utime required for this test.")
476 def test_utime_fd(self):
477 def set_time(filename, ns):
478 with open(filename, 'wb') as fp:
479 # use a file descriptor to test futimens(timespec)
480 # or futimes(timeval)
481 os.utime(fp.fileno(), ns=ns)
482 self._test_utime(set_time)
483
484 @unittest.skipUnless(os.utime in os.supports_dir_fd,
485 "dir_fd support for utime required for this test.")
486 def test_utime_dir_fd(self):
487 def set_time(filename, ns):
488 dirname, name = os.path.split(filename)
489 dirfd = os.open(dirname, os.O_RDONLY)
490 try:
491 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
492 os.utime(name, dir_fd=dirfd, ns=ns)
493 finally:
494 os.close(dirfd)
495 self._test_utime(set_time)
496
497 def test_utime_directory(self):
498 def set_time(filename, ns):
499 # test calling os.utime() on a directory
500 os.utime(filename, ns=ns)
501 self._test_utime(set_time, filename=self.dirname)
502
503 def _test_utime_current(self, set_time):
504 # Get the system clock
505 current = time.time()
506
507 # Call os.utime() to set the timestamp to the current system clock
508 set_time(self.fname)
509
510 if not self.support_subsecond(self.fname):
511 delta = 1.0
512 else:
513 # On Windows, the usual resolution of time.time() is 15.6 ms
514 delta = 0.020
515 st = os.stat(self.fname)
516 msg = ("st_time=%r, current=%r, dt=%r"
517 % (st.st_mtime, current, st.st_mtime - current))
518 self.assertAlmostEqual(st.st_mtime, current,
519 delta=delta, msg=msg)
520
521 def test_utime_current(self):
522 def set_time(filename):
523 # Set to the current time in the new way
524 os.utime(self.fname)
525 self._test_utime_current(set_time)
526
527 def test_utime_current_old(self):
528 def set_time(filename):
529 # Set to the current time in the old explicit way.
530 os.utime(self.fname, None)
531 self._test_utime_current(set_time)
532
533 def get_file_system(self, path):
534 if sys.platform == 'win32':
535 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
536 import ctypes
537 kernel32 = ctypes.windll.kernel32
538 buf = ctypes.create_unicode_buffer("", 100)
539 ok = kernel32.GetVolumeInformationW(root, None, 0,
540 None, None, None,
541 buf, len(buf))
542 if ok:
543 return buf.value
544 # return None if the filesystem is unknown
545
546 def test_large_time(self):
547 # Many filesystems are limited to the year 2038. At least, the test
548 # pass with NTFS filesystem.
549 if self.get_file_system(self.dirname) != "NTFS":
550 self.skipTest("requires NTFS")
551
552 large = 5000000000 # some day in 2128
553 os.utime(self.fname, (large, large))
554 self.assertEqual(os.stat(self.fname).st_mtime, large)
555
556 def test_utime_invalid_arguments(self):
557 # seconds and nanoseconds parameters are mutually exclusive
558 with self.assertRaises(ValueError):
559 os.utime(self.fname, (5, 5), ns=(5, 5))
560
561
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000562from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000563
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000564class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000565 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000566 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000567
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000568 def setUp(self):
569 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000570 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000571 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000572 for key, value in self._reference().items():
573 os.environ[key] = value
574
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000575 def tearDown(self):
576 os.environ.clear()
577 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000578 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000579 os.environb.clear()
580 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000581
Christian Heimes90333392007-11-01 19:08:42 +0000582 def _reference(self):
583 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
584
585 def _empty_mapping(self):
586 os.environ.clear()
587 return os.environ
588
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000589 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300590 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000591 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000592 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300593 os.environ.update(HELLO="World")
594 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
595 value = popen.read().strip()
596 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000597
Ezio Melottic7e139b2012-09-26 20:01:34 +0300598 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000599 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300600 with os.popen(
601 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
602 it = iter(popen)
603 self.assertEqual(next(it), "line1\n")
604 self.assertEqual(next(it), "line2\n")
605 self.assertEqual(next(it), "line3\n")
606 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000607
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000608 # Verify environ keys and values from the OS are of the
609 # correct str type.
610 def test_keyvalue_types(self):
611 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000612 self.assertEqual(type(key), str)
613 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000614
Christian Heimes90333392007-11-01 19:08:42 +0000615 def test_items(self):
616 for key, value in self._reference().items():
617 self.assertEqual(os.environ.get(key), value)
618
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000619 # Issue 7310
620 def test___repr__(self):
621 """Check that the repr() of os.environ looks like environ({...})."""
622 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000623 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
624 '{!r}: {!r}'.format(key, value)
625 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000626
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000627 def test_get_exec_path(self):
628 defpath_list = os.defpath.split(os.pathsep)
629 test_path = ['/monty', '/python', '', '/flying/circus']
630 test_env = {'PATH': os.pathsep.join(test_path)}
631
632 saved_environ = os.environ
633 try:
634 os.environ = dict(test_env)
635 # Test that defaulting to os.environ works.
636 self.assertSequenceEqual(test_path, os.get_exec_path())
637 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
638 finally:
639 os.environ = saved_environ
640
641 # No PATH environment variable
642 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
643 # Empty PATH environment variable
644 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
645 # Supplied PATH environment variable
646 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
647
Victor Stinnerb745a742010-05-18 17:17:23 +0000648 if os.supports_bytes_environ:
649 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000650 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000651 # ignore BytesWarning warning
652 with warnings.catch_warnings(record=True):
653 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000654 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000655 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000656 pass
657 else:
658 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000659
660 # bytes key and/or value
661 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
662 ['abc'])
663 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
664 ['abc'])
665 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
666 ['abc'])
667
668 @unittest.skipUnless(os.supports_bytes_environ,
669 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000670 def test_environb(self):
671 # os.environ -> os.environb
672 value = 'euro\u20ac'
673 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000674 value_bytes = value.encode(sys.getfilesystemencoding(),
675 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000676 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000677 msg = "U+20AC character is not encodable to %s" % (
678 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000679 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000680 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000681 self.assertEqual(os.environ['unicode'], value)
682 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000683
684 # os.environb -> os.environ
685 value = b'\xff'
686 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000687 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000688 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000689 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000690
Charles-François Natali2966f102011-11-26 11:32:46 +0100691 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
692 # #13415).
693 @support.requires_freebsd_version(7)
694 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100695 def test_unset_error(self):
696 if sys.platform == "win32":
697 # an environment variable is limited to 32,767 characters
698 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100699 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100700 else:
701 # "=" is not allowed in a variable name
702 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100703 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100704
Victor Stinner6d101392013-04-14 16:35:04 +0200705 def test_key_type(self):
706 missing = 'missingkey'
707 self.assertNotIn(missing, os.environ)
708
Victor Stinner839e5ea2013-04-14 16:43:03 +0200709 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200710 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200711 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200712 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200713
Victor Stinner839e5ea2013-04-14 16:43:03 +0200714 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200715 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200716 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200717 self.assertTrue(cm.exception.__suppress_context__)
718
Victor Stinner6d101392013-04-14 16:35:04 +0200719
Tim Petersc4e09402003-04-25 07:11:48 +0000720class WalkTests(unittest.TestCase):
721 """Tests for os.walk()."""
722
Victor Stinner0561c532015-03-12 10:28:24 +0100723 # Wrapper to hide minor differences between os.walk and os.fwalk
724 # to tests both functions with the same code base
725 def walk(self, directory, topdown=True, follow_symlinks=False):
726 walk_it = os.walk(directory,
727 topdown=topdown,
728 followlinks=follow_symlinks)
729 for root, dirs, files in walk_it:
730 yield (root, dirs, files)
731
Charles-François Natali7372b062012-02-05 15:15:38 +0100732 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100733 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000734
735 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000736 # TESTFN/
737 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000738 # tmp1
739 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000740 # tmp2
741 # SUB11/ no kids
742 # SUB2/ a file kid and a dirsymlink kid
743 # tmp3
744 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200745 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000746 # TEST2/
747 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100748 self.walk_path = join(support.TESTFN, "TEST1")
749 self.sub1_path = join(self.walk_path, "SUB1")
750 self.sub11_path = join(self.sub1_path, "SUB11")
751 sub2_path = join(self.walk_path, "SUB2")
752 tmp1_path = join(self.walk_path, "tmp1")
753 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000754 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100755 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000756 t2_path = join(support.TESTFN, "TEST2")
757 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200758 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000759
760 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100761 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000762 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000763 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100764
Guido van Rossumd8faa362007-04-27 19:54:29 +0000765 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000766 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000767 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
768 f.close()
769
Victor Stinner0561c532015-03-12 10:28:24 +0100770 if support.can_symlink():
771 os.symlink(os.path.abspath(t2_path), self.link_path)
772 os.symlink('broken', broken_link_path, True)
773 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
774 else:
775 self.sub2_tree = (sub2_path, [], ["tmp3"])
776
777 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000778 # Walk top-down.
Victor Stinner0561c532015-03-12 10:28:24 +0100779 all = list(os.walk(self.walk_path))
780
Tim Petersc4e09402003-04-25 07:11:48 +0000781 self.assertEqual(len(all), 4)
782 # We can't know which order SUB1 and SUB2 will appear in.
783 # Not flipped: TESTFN, SUB1, SUB11, SUB2
784 # flipped: TESTFN, SUB2, SUB1, SUB11
785 flipped = all[0][1][0] != "SUB1"
786 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200787 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100788 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
789 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
790 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
791 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000792
Victor Stinner0561c532015-03-12 10:28:24 +0100793 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000794 # Prune the search.
795 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100796 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000797 all.append((root, dirs, files))
798 # Don't descend into SUB1.
799 if 'SUB1' in dirs:
800 # Note that this also mutates the dirs we appended to all!
801 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000802
Victor Stinner0561c532015-03-12 10:28:24 +0100803 self.assertEqual(len(all), 2)
804 self.assertEqual(all[0],
805 (self.walk_path, ["SUB2"], ["tmp1"]))
806
807 all[1][-1].sort()
808 self.assertEqual(all[1], self.sub2_tree)
809
810 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000811 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100812 all = list(self.walk(self.walk_path, topdown=False))
813
Tim Petersc4e09402003-04-25 07:11:48 +0000814 self.assertEqual(len(all), 4)
815 # We can't know which order SUB1 and SUB2 will appear in.
816 # Not flipped: SUB11, SUB1, SUB2, TESTFN
817 # flipped: SUB2, SUB11, SUB1, TESTFN
818 flipped = all[3][1][0] != "SUB1"
819 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200820 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100821 self.assertEqual(all[3],
822 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
823 self.assertEqual(all[flipped],
824 (self.sub11_path, [], []))
825 self.assertEqual(all[flipped + 1],
826 (self.sub1_path, ["SUB11"], ["tmp2"]))
827 self.assertEqual(all[2 - 2 * flipped],
828 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000829
Victor Stinner0561c532015-03-12 10:28:24 +0100830 def test_walk_symlink(self):
831 if not support.can_symlink():
832 self.skipTest("need symlink support")
833
834 # Walk, following symlinks.
835 walk_it = self.walk(self.walk_path, follow_symlinks=True)
836 for root, dirs, files in walk_it:
837 if root == self.link_path:
838 self.assertEqual(dirs, [])
839 self.assertEqual(files, ["tmp4"])
840 break
841 else:
842 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000843
844 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000845 # Tear everything down. This is a decent use for bottom-up on
846 # Windows, which doesn't have a recursive delete command. The
847 # (not so) subtlety is that rmdir will fail unless the dir's
848 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000849 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000850 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000851 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000852 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000853 dirname = os.path.join(root, name)
854 if not os.path.islink(dirname):
855 os.rmdir(dirname)
856 else:
857 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000858 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000859
Charles-François Natali7372b062012-02-05 15:15:38 +0100860
861@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
862class FwalkTests(WalkTests):
863 """Tests for os.fwalk()."""
864
Victor Stinner0561c532015-03-12 10:28:24 +0100865 def walk(self, directory, topdown=True, follow_symlinks=False):
866 walk_it = os.fwalk(directory,
867 topdown=topdown,
868 follow_symlinks=follow_symlinks)
869 for root, dirs, files, root_fd in walk_it:
870 yield (root, dirs, files)
871
872
Larry Hastingsc48fe982012-06-25 04:49:05 -0700873 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
874 """
875 compare with walk() results.
876 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700877 walk_kwargs = walk_kwargs.copy()
878 fwalk_kwargs = fwalk_kwargs.copy()
879 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
880 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
881 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700882
Charles-François Natali7372b062012-02-05 15:15:38 +0100883 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700884 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100885 expected[root] = (set(dirs), set(files))
886
Larry Hastingsc48fe982012-06-25 04:49:05 -0700887 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100888 self.assertIn(root, expected)
889 self.assertEqual(expected[root], (set(dirs), set(files)))
890
Larry Hastingsc48fe982012-06-25 04:49:05 -0700891 def test_compare_to_walk(self):
892 kwargs = {'top': support.TESTFN}
893 self._compare_to_walk(kwargs, kwargs)
894
Charles-François Natali7372b062012-02-05 15:15:38 +0100895 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700896 try:
897 fd = os.open(".", os.O_RDONLY)
898 walk_kwargs = {'top': support.TESTFN}
899 fwalk_kwargs = walk_kwargs.copy()
900 fwalk_kwargs['dir_fd'] = fd
901 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
902 finally:
903 os.close(fd)
904
905 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100906 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700907 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
908 args = support.TESTFN, topdown, None
909 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100910 # check that the FD is valid
911 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700912 # redundant check
913 os.stat(rootfd)
914 # check that listdir() returns consistent information
915 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100916
917 def test_fd_leak(self):
918 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
919 # we both check that calling fwalk() a large number of times doesn't
920 # yield EMFILE, and that the minimum allocated FD hasn't changed.
921 minfd = os.dup(1)
922 os.close(minfd)
923 for i in range(256):
924 for x in os.fwalk(support.TESTFN):
925 pass
926 newfd = os.dup(1)
927 self.addCleanup(os.close, newfd)
928 self.assertEqual(newfd, minfd)
929
930 def tearDown(self):
931 # cleanup
932 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
933 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700934 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100935 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700936 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700937 if stat.S_ISDIR(st.st_mode):
938 os.rmdir(name, dir_fd=rootfd)
939 else:
940 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100941 os.rmdir(support.TESTFN)
942
943
Guido van Rossume7ba4952007-06-06 23:52:48 +0000944class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000945 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000946 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000947
948 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000949 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000950 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
951 os.makedirs(path) # Should work
952 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
953 os.makedirs(path)
954
955 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000956 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000957 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
958 os.makedirs(path)
959 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
960 'dir5', 'dir6')
961 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000962
Terry Reedy5a22b652010-12-02 07:05:56 +0000963 def test_exist_ok_existing_directory(self):
964 path = os.path.join(support.TESTFN, 'dir1')
965 mode = 0o777
966 old_mask = os.umask(0o022)
967 os.makedirs(path, mode)
968 self.assertRaises(OSError, os.makedirs, path, mode)
969 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -0400970 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +0000971 os.makedirs(path, mode=mode, exist_ok=True)
972 os.umask(old_mask)
973
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400974 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700975 def test_chown_uid_gid_arguments_must_be_index(self):
976 stat = os.stat(support.TESTFN)
977 uid = stat.st_uid
978 gid = stat.st_gid
979 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
980 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
981 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
982 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
983 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
984
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700985 def test_exist_ok_s_isgid_directory(self):
986 path = os.path.join(support.TESTFN, 'dir1')
987 S_ISGID = stat.S_ISGID
988 mode = 0o777
989 old_mask = os.umask(0o022)
990 try:
991 existing_testfn_mode = stat.S_IMODE(
992 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700993 try:
994 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700995 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700996 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700997 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
998 raise unittest.SkipTest('No support for S_ISGID dir mode.')
999 # The os should apply S_ISGID from the parent dir for us, but
1000 # this test need not depend on that behavior. Be explicit.
1001 os.makedirs(path, mode | S_ISGID)
1002 # http://bugs.python.org/issue14992
1003 # Should not fail when the bit is already set.
1004 os.makedirs(path, mode, exist_ok=True)
1005 # remove the bit.
1006 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001007 # May work even when the bit is not already set when demanded.
1008 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001009 finally:
1010 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001011
1012 def test_exist_ok_existing_regular_file(self):
1013 base = support.TESTFN
1014 path = os.path.join(support.TESTFN, 'dir1')
1015 f = open(path, 'w')
1016 f.write('abc')
1017 f.close()
1018 self.assertRaises(OSError, os.makedirs, path)
1019 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1020 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1021 os.remove(path)
1022
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001023 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001024 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001025 'dir4', 'dir5', 'dir6')
1026 # If the tests failed, the bottom-most directory ('../dir6')
1027 # may not have been created, so we look for the outermost directory
1028 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001029 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001030 path = os.path.dirname(path)
1031
1032 os.removedirs(path)
1033
Andrew Svetlov405faed2012-12-25 12:18:09 +02001034
1035class RemoveDirsTests(unittest.TestCase):
1036 def setUp(self):
1037 os.makedirs(support.TESTFN)
1038
1039 def tearDown(self):
1040 support.rmtree(support.TESTFN)
1041
1042 def test_remove_all(self):
1043 dira = os.path.join(support.TESTFN, 'dira')
1044 os.mkdir(dira)
1045 dirb = os.path.join(dira, 'dirb')
1046 os.mkdir(dirb)
1047 os.removedirs(dirb)
1048 self.assertFalse(os.path.exists(dirb))
1049 self.assertFalse(os.path.exists(dira))
1050 self.assertFalse(os.path.exists(support.TESTFN))
1051
1052 def test_remove_partial(self):
1053 dira = os.path.join(support.TESTFN, 'dira')
1054 os.mkdir(dira)
1055 dirb = os.path.join(dira, 'dirb')
1056 os.mkdir(dirb)
1057 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1058 f.write('text')
1059 os.removedirs(dirb)
1060 self.assertFalse(os.path.exists(dirb))
1061 self.assertTrue(os.path.exists(dira))
1062 self.assertTrue(os.path.exists(support.TESTFN))
1063
1064 def test_remove_nothing(self):
1065 dira = os.path.join(support.TESTFN, 'dira')
1066 os.mkdir(dira)
1067 dirb = os.path.join(dira, 'dirb')
1068 os.mkdir(dirb)
1069 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1070 f.write('text')
1071 with self.assertRaises(OSError):
1072 os.removedirs(dirb)
1073 self.assertTrue(os.path.exists(dirb))
1074 self.assertTrue(os.path.exists(dira))
1075 self.assertTrue(os.path.exists(support.TESTFN))
1076
1077
Guido van Rossume7ba4952007-06-06 23:52:48 +00001078class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001079 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001080 with open(os.devnull, 'wb') as f:
1081 f.write(b'hello')
1082 f.close()
1083 with open(os.devnull, 'rb') as f:
1084 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001085
Andrew Svetlov405faed2012-12-25 12:18:09 +02001086
Guido van Rossume7ba4952007-06-06 23:52:48 +00001087class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001088 def test_urandom_length(self):
1089 self.assertEqual(len(os.urandom(0)), 0)
1090 self.assertEqual(len(os.urandom(1)), 1)
1091 self.assertEqual(len(os.urandom(10)), 10)
1092 self.assertEqual(len(os.urandom(100)), 100)
1093 self.assertEqual(len(os.urandom(1000)), 1000)
1094
1095 def test_urandom_value(self):
1096 data1 = os.urandom(16)
1097 data2 = os.urandom(16)
1098 self.assertNotEqual(data1, data2)
1099
1100 def get_urandom_subprocess(self, count):
1101 code = '\n'.join((
1102 'import os, sys',
1103 'data = os.urandom(%s)' % count,
1104 'sys.stdout.buffer.write(data)',
1105 'sys.stdout.buffer.flush()'))
1106 out = assert_python_ok('-c', code)
1107 stdout = out[1]
1108 self.assertEqual(len(stdout), 16)
1109 return stdout
1110
1111 def test_urandom_subprocess(self):
1112 data1 = self.get_urandom_subprocess(16)
1113 data2 = self.get_urandom_subprocess(16)
1114 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001115
Victor Stinnerfe02e392014-12-21 01:16:38 +01001116
1117HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
1118
1119@unittest.skipIf(HAVE_GETENTROPY,
1120 "getentropy() does not use a file descriptor")
1121class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001122 @unittest.skipUnless(resource, "test requires the resource module")
1123 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001124 # Check urandom() failing when it is not able to open /dev/random.
1125 # We spawn a new process to make the test more robust (if getrlimit()
1126 # failed to restore the file descriptor limit after this, the whole
1127 # test suite would crash; this actually happened on the OS X Tiger
1128 # buildbot).
1129 code = """if 1:
1130 import errno
1131 import os
1132 import resource
1133
1134 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1135 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1136 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001137 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001138 except OSError as e:
1139 assert e.errno == errno.EMFILE, e.errno
1140 else:
1141 raise AssertionError("OSError not raised")
1142 """
1143 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001144
Antoine Pitroue472aea2014-04-26 14:33:03 +02001145 def test_urandom_fd_closed(self):
1146 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1147 # closed.
1148 code = """if 1:
1149 import os
1150 import sys
1151 os.urandom(4)
1152 os.closerange(3, 256)
1153 sys.stdout.buffer.write(os.urandom(4))
1154 """
1155 rc, out, err = assert_python_ok('-Sc', code)
1156
1157 def test_urandom_fd_reopened(self):
1158 # Issue #21207: urandom() should detect its fd to /dev/urandom
1159 # changed to something else, and reopen it.
1160 with open(support.TESTFN, 'wb') as f:
1161 f.write(b"x" * 256)
1162 self.addCleanup(os.unlink, support.TESTFN)
1163 code = """if 1:
1164 import os
1165 import sys
1166 os.urandom(4)
1167 for fd in range(3, 256):
1168 try:
1169 os.close(fd)
1170 except OSError:
1171 pass
1172 else:
1173 # Found the urandom fd (XXX hopefully)
1174 break
1175 os.closerange(3, 256)
1176 with open({TESTFN!r}, 'rb') as f:
1177 os.dup2(f.fileno(), fd)
1178 sys.stdout.buffer.write(os.urandom(4))
1179 sys.stdout.buffer.write(os.urandom(4))
1180 """.format(TESTFN=support.TESTFN)
1181 rc, out, err = assert_python_ok('-Sc', code)
1182 self.assertEqual(len(out), 8)
1183 self.assertNotEqual(out[0:4], out[4:8])
1184 rc, out2, err2 = assert_python_ok('-Sc', code)
1185 self.assertEqual(len(out2), 8)
1186 self.assertNotEqual(out2, out)
1187
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001188
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001189@contextlib.contextmanager
1190def _execvpe_mockup(defpath=None):
1191 """
1192 Stubs out execv and execve functions when used as context manager.
1193 Records exec calls. The mock execv and execve functions always raise an
1194 exception as they would normally never return.
1195 """
1196 # A list of tuples containing (function name, first arg, args)
1197 # of calls to execv or execve that have been made.
1198 calls = []
1199
1200 def mock_execv(name, *args):
1201 calls.append(('execv', name, args))
1202 raise RuntimeError("execv called")
1203
1204 def mock_execve(name, *args):
1205 calls.append(('execve', name, args))
1206 raise OSError(errno.ENOTDIR, "execve called")
1207
1208 try:
1209 orig_execv = os.execv
1210 orig_execve = os.execve
1211 orig_defpath = os.defpath
1212 os.execv = mock_execv
1213 os.execve = mock_execve
1214 if defpath is not None:
1215 os.defpath = defpath
1216 yield calls
1217 finally:
1218 os.execv = orig_execv
1219 os.execve = orig_execve
1220 os.defpath = orig_defpath
1221
Guido van Rossume7ba4952007-06-06 23:52:48 +00001222class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001223 @unittest.skipIf(USING_LINUXTHREADS,
1224 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001225 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001226 self.assertRaises(OSError, os.execvpe, 'no such app-',
1227 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001228
Thomas Heller6790d602007-08-30 17:15:14 +00001229 def test_execvpe_with_bad_arglist(self):
1230 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1231
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001232 @unittest.skipUnless(hasattr(os, '_execvpe'),
1233 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001234 def _test_internal_execvpe(self, test_type):
1235 program_path = os.sep + 'absolutepath'
1236 if test_type is bytes:
1237 program = b'executable'
1238 fullpath = os.path.join(os.fsencode(program_path), program)
1239 native_fullpath = fullpath
1240 arguments = [b'progname', 'arg1', 'arg2']
1241 else:
1242 program = 'executable'
1243 arguments = ['progname', 'arg1', 'arg2']
1244 fullpath = os.path.join(program_path, program)
1245 if os.name != "nt":
1246 native_fullpath = os.fsencode(fullpath)
1247 else:
1248 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001249 env = {'spam': 'beans'}
1250
Victor Stinnerb745a742010-05-18 17:17:23 +00001251 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001252 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001253 self.assertRaises(RuntimeError,
1254 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001255 self.assertEqual(len(calls), 1)
1256 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1257
Victor Stinnerb745a742010-05-18 17:17:23 +00001258 # test os._execvpe() with a relative path:
1259 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001260 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001261 self.assertRaises(OSError,
1262 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001263 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001264 self.assertSequenceEqual(calls[0],
1265 ('execve', native_fullpath, (arguments, env)))
1266
1267 # test os._execvpe() with a relative path:
1268 # os.get_exec_path() reads the 'PATH' variable
1269 with _execvpe_mockup() as calls:
1270 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001271 if test_type is bytes:
1272 env_path[b'PATH'] = program_path
1273 else:
1274 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001275 self.assertRaises(OSError,
1276 os._execvpe, program, arguments, env=env_path)
1277 self.assertEqual(len(calls), 1)
1278 self.assertSequenceEqual(calls[0],
1279 ('execve', native_fullpath, (arguments, env_path)))
1280
1281 def test_internal_execvpe_str(self):
1282 self._test_internal_execvpe(str)
1283 if os.name != "nt":
1284 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001285
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001286
Serhiy Storchaka43767632013-11-03 21:31:38 +02001287@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001288class Win32ErrorTests(unittest.TestCase):
1289 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001290 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001291
1292 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001293 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001294
1295 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001296 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001297
1298 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001299 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001300 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001301 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001302 finally:
1303 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001304 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001305
1306 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001307 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001308
Thomas Wouters477c8d52006-05-27 19:21:47 +00001309 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001310 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001311
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001312class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001313 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001314 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1315 #singles.append("close")
1316 #We omit close because it doesn'r raise an exception on some platforms
1317 def get_single(f):
1318 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001319 if hasattr(os, f):
1320 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001321 return helper
1322 for f in singles:
1323 locals()["test_"+f] = get_single(f)
1324
Benjamin Peterson7522c742009-01-19 21:00:09 +00001325 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001326 try:
1327 f(support.make_bad_fd(), *args)
1328 except OSError as e:
1329 self.assertEqual(e.errno, errno.EBADF)
1330 else:
1331 self.fail("%r didn't raise a OSError with a bad file descriptor"
1332 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001333
Serhiy Storchaka43767632013-11-03 21:31:38 +02001334 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001335 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001336 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001337
Serhiy Storchaka43767632013-11-03 21:31:38 +02001338 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001339 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001340 fd = support.make_bad_fd()
1341 # Make sure none of the descriptors we are about to close are
1342 # currently valid (issue 6542).
1343 for i in range(10):
1344 try: os.fstat(fd+i)
1345 except OSError:
1346 pass
1347 else:
1348 break
1349 if i < 2:
1350 raise unittest.SkipTest(
1351 "Unable to acquire a range of invalid file descriptors")
1352 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001353
Serhiy Storchaka43767632013-11-03 21:31:38 +02001354 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001355 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001356 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001357
Serhiy Storchaka43767632013-11-03 21:31:38 +02001358 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001359 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001360 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001361
Serhiy Storchaka43767632013-11-03 21:31:38 +02001362 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001363 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001364 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001365
Serhiy Storchaka43767632013-11-03 21:31:38 +02001366 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001367 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001368 self.check(os.pathconf, "PC_NAME_MAX")
1369 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001370
Serhiy Storchaka43767632013-11-03 21:31:38 +02001371 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001372 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001373 self.check(os.truncate, 0)
1374 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001375
Serhiy Storchaka43767632013-11-03 21:31:38 +02001376 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001377 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001378 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001379
Serhiy Storchaka43767632013-11-03 21:31:38 +02001380 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001381 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001382 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001383
Victor Stinner57ddf782014-01-08 15:21:28 +01001384 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1385 def test_readv(self):
1386 buf = bytearray(10)
1387 self.check(os.readv, [buf])
1388
Serhiy Storchaka43767632013-11-03 21:31:38 +02001389 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001390 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001391 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001392
Serhiy Storchaka43767632013-11-03 21:31:38 +02001393 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001394 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001395 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001396
Victor Stinner57ddf782014-01-08 15:21:28 +01001397 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1398 def test_writev(self):
1399 self.check(os.writev, [b'abc'])
1400
Brian Curtin1b9df392010-11-24 20:24:31 +00001401
1402class LinkTests(unittest.TestCase):
1403 def setUp(self):
1404 self.file1 = support.TESTFN
1405 self.file2 = os.path.join(support.TESTFN + "2")
1406
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001407 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001408 for file in (self.file1, self.file2):
1409 if os.path.exists(file):
1410 os.unlink(file)
1411
Brian Curtin1b9df392010-11-24 20:24:31 +00001412 def _test_link(self, file1, file2):
1413 with open(file1, "w") as f1:
1414 f1.write("test")
1415
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001416 with warnings.catch_warnings():
1417 warnings.simplefilter("ignore", DeprecationWarning)
1418 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001419 with open(file1, "r") as f1, open(file2, "r") as f2:
1420 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1421
1422 def test_link(self):
1423 self._test_link(self.file1, self.file2)
1424
1425 def test_link_bytes(self):
1426 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1427 bytes(self.file2, sys.getfilesystemencoding()))
1428
Brian Curtinf498b752010-11-30 15:54:04 +00001429 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001430 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001431 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001432 except UnicodeError:
1433 raise unittest.SkipTest("Unable to encode for this platform.")
1434
Brian Curtinf498b752010-11-30 15:54:04 +00001435 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001436 self.file2 = self.file1 + "2"
1437 self._test_link(self.file1, self.file2)
1438
Serhiy Storchaka43767632013-11-03 21:31:38 +02001439@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1440class PosixUidGidTests(unittest.TestCase):
1441 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1442 def test_setuid(self):
1443 if os.getuid() != 0:
1444 self.assertRaises(OSError, os.setuid, 0)
1445 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001446
Serhiy Storchaka43767632013-11-03 21:31:38 +02001447 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1448 def test_setgid(self):
1449 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1450 self.assertRaises(OSError, os.setgid, 0)
1451 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001452
Serhiy Storchaka43767632013-11-03 21:31:38 +02001453 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1454 def test_seteuid(self):
1455 if os.getuid() != 0:
1456 self.assertRaises(OSError, os.seteuid, 0)
1457 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001458
Serhiy Storchaka43767632013-11-03 21:31:38 +02001459 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1460 def test_setegid(self):
1461 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1462 self.assertRaises(OSError, os.setegid, 0)
1463 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001464
Serhiy Storchaka43767632013-11-03 21:31:38 +02001465 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1466 def test_setreuid(self):
1467 if os.getuid() != 0:
1468 self.assertRaises(OSError, os.setreuid, 0, 0)
1469 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1470 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001471
Serhiy Storchaka43767632013-11-03 21:31:38 +02001472 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1473 def test_setreuid_neg1(self):
1474 # Needs to accept -1. We run this in a subprocess to avoid
1475 # altering the test runner's process state (issue8045).
1476 subprocess.check_call([
1477 sys.executable, '-c',
1478 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001479
Serhiy Storchaka43767632013-11-03 21:31:38 +02001480 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1481 def test_setregid(self):
1482 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1483 self.assertRaises(OSError, os.setregid, 0, 0)
1484 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1485 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001486
Serhiy Storchaka43767632013-11-03 21:31:38 +02001487 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1488 def test_setregid_neg1(self):
1489 # Needs to accept -1. We run this in a subprocess to avoid
1490 # altering the test runner's process state (issue8045).
1491 subprocess.check_call([
1492 sys.executable, '-c',
1493 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001494
Serhiy Storchaka43767632013-11-03 21:31:38 +02001495@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1496class Pep383Tests(unittest.TestCase):
1497 def setUp(self):
1498 if support.TESTFN_UNENCODABLE:
1499 self.dir = support.TESTFN_UNENCODABLE
1500 elif support.TESTFN_NONASCII:
1501 self.dir = support.TESTFN_NONASCII
1502 else:
1503 self.dir = support.TESTFN
1504 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001505
Serhiy Storchaka43767632013-11-03 21:31:38 +02001506 bytesfn = []
1507 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001508 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001509 fn = os.fsencode(fn)
1510 except UnicodeEncodeError:
1511 return
1512 bytesfn.append(fn)
1513 add_filename(support.TESTFN_UNICODE)
1514 if support.TESTFN_UNENCODABLE:
1515 add_filename(support.TESTFN_UNENCODABLE)
1516 if support.TESTFN_NONASCII:
1517 add_filename(support.TESTFN_NONASCII)
1518 if not bytesfn:
1519 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001520
Serhiy Storchaka43767632013-11-03 21:31:38 +02001521 self.unicodefn = set()
1522 os.mkdir(self.dir)
1523 try:
1524 for fn in bytesfn:
1525 support.create_empty_file(os.path.join(self.bdir, fn))
1526 fn = os.fsdecode(fn)
1527 if fn in self.unicodefn:
1528 raise ValueError("duplicate filename")
1529 self.unicodefn.add(fn)
1530 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001531 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001532 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001533
Serhiy Storchaka43767632013-11-03 21:31:38 +02001534 def tearDown(self):
1535 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001536
Serhiy Storchaka43767632013-11-03 21:31:38 +02001537 def test_listdir(self):
1538 expected = self.unicodefn
1539 found = set(os.listdir(self.dir))
1540 self.assertEqual(found, expected)
1541 # test listdir without arguments
1542 current_directory = os.getcwd()
1543 try:
1544 os.chdir(os.sep)
1545 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1546 finally:
1547 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001548
Serhiy Storchaka43767632013-11-03 21:31:38 +02001549 def test_open(self):
1550 for fn in self.unicodefn:
1551 f = open(os.path.join(self.dir, fn), 'rb')
1552 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001553
Serhiy Storchaka43767632013-11-03 21:31:38 +02001554 @unittest.skipUnless(hasattr(os, 'statvfs'),
1555 "need os.statvfs()")
1556 def test_statvfs(self):
1557 # issue #9645
1558 for fn in self.unicodefn:
1559 # should not fail with file not found error
1560 fullname = os.path.join(self.dir, fn)
1561 os.statvfs(fullname)
1562
1563 def test_stat(self):
1564 for fn in self.unicodefn:
1565 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001566
Brian Curtineb24d742010-04-12 17:16:38 +00001567@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1568class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001569 def _kill(self, sig):
1570 # Start sys.executable as a subprocess and communicate from the
1571 # subprocess to the parent that the interpreter is ready. When it
1572 # becomes ready, send *sig* via os.kill to the subprocess and check
1573 # that the return code is equal to *sig*.
1574 import ctypes
1575 from ctypes import wintypes
1576 import msvcrt
1577
1578 # Since we can't access the contents of the process' stdout until the
1579 # process has exited, use PeekNamedPipe to see what's inside stdout
1580 # without waiting. This is done so we can tell that the interpreter
1581 # is started and running at a point where it could handle a signal.
1582 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1583 PeekNamedPipe.restype = wintypes.BOOL
1584 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1585 ctypes.POINTER(ctypes.c_char), # stdout buf
1586 wintypes.DWORD, # Buffer size
1587 ctypes.POINTER(wintypes.DWORD), # bytes read
1588 ctypes.POINTER(wintypes.DWORD), # bytes avail
1589 ctypes.POINTER(wintypes.DWORD)) # bytes left
1590 msg = "running"
1591 proc = subprocess.Popen([sys.executable, "-c",
1592 "import sys;"
1593 "sys.stdout.write('{}');"
1594 "sys.stdout.flush();"
1595 "input()".format(msg)],
1596 stdout=subprocess.PIPE,
1597 stderr=subprocess.PIPE,
1598 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001599 self.addCleanup(proc.stdout.close)
1600 self.addCleanup(proc.stderr.close)
1601 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001602
1603 count, max = 0, 100
1604 while count < max and proc.poll() is None:
1605 # Create a string buffer to store the result of stdout from the pipe
1606 buf = ctypes.create_string_buffer(len(msg))
1607 # Obtain the text currently in proc.stdout
1608 # Bytes read/avail/left are left as NULL and unused
1609 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1610 buf, ctypes.sizeof(buf), None, None, None)
1611 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1612 if buf.value:
1613 self.assertEqual(msg, buf.value.decode())
1614 break
1615 time.sleep(0.1)
1616 count += 1
1617 else:
1618 self.fail("Did not receive communication from the subprocess")
1619
Brian Curtineb24d742010-04-12 17:16:38 +00001620 os.kill(proc.pid, sig)
1621 self.assertEqual(proc.wait(), sig)
1622
1623 def test_kill_sigterm(self):
1624 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001625 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001626
1627 def test_kill_int(self):
1628 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001629 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001630
1631 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001632 tagname = "test_os_%s" % uuid.uuid1()
1633 m = mmap.mmap(-1, 1, tagname)
1634 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001635 # Run a script which has console control handling enabled.
1636 proc = subprocess.Popen([sys.executable,
1637 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001638 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001639 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1640 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001641 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001642 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001643 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001644 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001645 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001646 count += 1
1647 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001648 # Forcefully kill the process if we weren't able to signal it.
1649 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001650 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001651 os.kill(proc.pid, event)
1652 # proc.send_signal(event) could also be done here.
1653 # Allow time for the signal to be passed and the process to exit.
1654 time.sleep(0.5)
1655 if not proc.poll():
1656 # Forcefully kill the process if we weren't able to signal it.
1657 os.kill(proc.pid, signal.SIGINT)
1658 self.fail("subprocess did not stop on {}".format(name))
1659
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001660 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001661 def test_CTRL_C_EVENT(self):
1662 from ctypes import wintypes
1663 import ctypes
1664
1665 # Make a NULL value by creating a pointer with no argument.
1666 NULL = ctypes.POINTER(ctypes.c_int)()
1667 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1668 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1669 wintypes.BOOL)
1670 SetConsoleCtrlHandler.restype = wintypes.BOOL
1671
1672 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001673 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001674 # by subprocesses.
1675 SetConsoleCtrlHandler(NULL, 0)
1676
1677 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1678
1679 def test_CTRL_BREAK_EVENT(self):
1680 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1681
1682
Brian Curtind40e6f72010-07-08 21:39:08 +00001683@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001684class Win32ListdirTests(unittest.TestCase):
1685 """Test listdir on Windows."""
1686
1687 def setUp(self):
1688 self.created_paths = []
1689 for i in range(2):
1690 dir_name = 'SUB%d' % i
1691 dir_path = os.path.join(support.TESTFN, dir_name)
1692 file_name = 'FILE%d' % i
1693 file_path = os.path.join(support.TESTFN, file_name)
1694 os.makedirs(dir_path)
1695 with open(file_path, 'w') as f:
1696 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1697 self.created_paths.extend([dir_name, file_name])
1698 self.created_paths.sort()
1699
1700 def tearDown(self):
1701 shutil.rmtree(support.TESTFN)
1702
1703 def test_listdir_no_extended_path(self):
1704 """Test when the path is not an "extended" path."""
1705 # unicode
1706 self.assertEqual(
1707 sorted(os.listdir(support.TESTFN)),
1708 self.created_paths)
1709 # bytes
1710 self.assertEqual(
1711 sorted(os.listdir(os.fsencode(support.TESTFN))),
1712 [os.fsencode(path) for path in self.created_paths])
1713
1714 def test_listdir_extended_path(self):
1715 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001716 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001717 # unicode
1718 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1719 self.assertEqual(
1720 sorted(os.listdir(path)),
1721 self.created_paths)
1722 # bytes
1723 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1724 self.assertEqual(
1725 sorted(os.listdir(path)),
1726 [os.fsencode(path) for path in self.created_paths])
1727
1728
1729@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001730@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001731class Win32SymlinkTests(unittest.TestCase):
1732 filelink = 'filelinktest'
1733 filelink_target = os.path.abspath(__file__)
1734 dirlink = 'dirlinktest'
1735 dirlink_target = os.path.dirname(filelink_target)
1736 missing_link = 'missing link'
1737
1738 def setUp(self):
1739 assert os.path.exists(self.dirlink_target)
1740 assert os.path.exists(self.filelink_target)
1741 assert not os.path.exists(self.dirlink)
1742 assert not os.path.exists(self.filelink)
1743 assert not os.path.exists(self.missing_link)
1744
1745 def tearDown(self):
1746 if os.path.exists(self.filelink):
1747 os.remove(self.filelink)
1748 if os.path.exists(self.dirlink):
1749 os.rmdir(self.dirlink)
1750 if os.path.lexists(self.missing_link):
1751 os.remove(self.missing_link)
1752
1753 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001754 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001755 self.assertTrue(os.path.exists(self.dirlink))
1756 self.assertTrue(os.path.isdir(self.dirlink))
1757 self.assertTrue(os.path.islink(self.dirlink))
1758 self.check_stat(self.dirlink, self.dirlink_target)
1759
1760 def test_file_link(self):
1761 os.symlink(self.filelink_target, self.filelink)
1762 self.assertTrue(os.path.exists(self.filelink))
1763 self.assertTrue(os.path.isfile(self.filelink))
1764 self.assertTrue(os.path.islink(self.filelink))
1765 self.check_stat(self.filelink, self.filelink_target)
1766
1767 def _create_missing_dir_link(self):
1768 'Create a "directory" link to a non-existent target'
1769 linkname = self.missing_link
1770 if os.path.lexists(linkname):
1771 os.remove(linkname)
1772 target = r'c:\\target does not exist.29r3c740'
1773 assert not os.path.exists(target)
1774 target_is_dir = True
1775 os.symlink(target, linkname, target_is_dir)
1776
1777 def test_remove_directory_link_to_missing_target(self):
1778 self._create_missing_dir_link()
1779 # For compatibility with Unix, os.remove will check the
1780 # directory status and call RemoveDirectory if the symlink
1781 # was created with target_is_dir==True.
1782 os.remove(self.missing_link)
1783
1784 @unittest.skip("currently fails; consider for improvement")
1785 def test_isdir_on_directory_link_to_missing_target(self):
1786 self._create_missing_dir_link()
1787 # consider having isdir return true for directory links
1788 self.assertTrue(os.path.isdir(self.missing_link))
1789
1790 @unittest.skip("currently fails; consider for improvement")
1791 def test_rmdir_on_directory_link_to_missing_target(self):
1792 self._create_missing_dir_link()
1793 # consider allowing rmdir to remove directory links
1794 os.rmdir(self.missing_link)
1795
1796 def check_stat(self, link, target):
1797 self.assertEqual(os.stat(link), os.stat(target))
1798 self.assertNotEqual(os.lstat(link), os.stat(link))
1799
Brian Curtind25aef52011-06-13 15:16:04 -05001800 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001801 with warnings.catch_warnings():
1802 warnings.simplefilter("ignore", DeprecationWarning)
1803 self.assertEqual(os.stat(bytes_link), os.stat(target))
1804 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001805
1806 def test_12084(self):
1807 level1 = os.path.abspath(support.TESTFN)
1808 level2 = os.path.join(level1, "level2")
1809 level3 = os.path.join(level2, "level3")
1810 try:
1811 os.mkdir(level1)
1812 os.mkdir(level2)
1813 os.mkdir(level3)
1814
1815 file1 = os.path.abspath(os.path.join(level1, "file1"))
1816
1817 with open(file1, "w") as f:
1818 f.write("file1")
1819
1820 orig_dir = os.getcwd()
1821 try:
1822 os.chdir(level2)
1823 link = os.path.join(level2, "link")
1824 os.symlink(os.path.relpath(file1), "link")
1825 self.assertIn("link", os.listdir(os.getcwd()))
1826
1827 # Check os.stat calls from the same dir as the link
1828 self.assertEqual(os.stat(file1), os.stat("link"))
1829
1830 # Check os.stat calls from a dir below the link
1831 os.chdir(level1)
1832 self.assertEqual(os.stat(file1),
1833 os.stat(os.path.relpath(link)))
1834
1835 # Check os.stat calls from a dir above the link
1836 os.chdir(level3)
1837 self.assertEqual(os.stat(file1),
1838 os.stat(os.path.relpath(link)))
1839 finally:
1840 os.chdir(orig_dir)
1841 except OSError as err:
1842 self.fail(err)
1843 finally:
1844 os.remove(file1)
1845 shutil.rmtree(level1)
1846
Brian Curtind40e6f72010-07-08 21:39:08 +00001847
Jason R. Coombs3a092862013-05-27 23:21:28 -04001848@support.skip_unless_symlink
1849class NonLocalSymlinkTests(unittest.TestCase):
1850
1851 def setUp(self):
1852 """
1853 Create this structure:
1854
1855 base
1856 \___ some_dir
1857 """
1858 os.makedirs('base/some_dir')
1859
1860 def tearDown(self):
1861 shutil.rmtree('base')
1862
1863 def test_directory_link_nonlocal(self):
1864 """
1865 The symlink target should resolve relative to the link, not relative
1866 to the current directory.
1867
1868 Then, link base/some_link -> base/some_dir and ensure that some_link
1869 is resolved as a directory.
1870
1871 In issue13772, it was discovered that directory detection failed if
1872 the symlink target was not specified relative to the current
1873 directory, which was a defect in the implementation.
1874 """
1875 src = os.path.join('base', 'some_link')
1876 os.symlink('some_dir', src)
1877 assert os.path.isdir(src)
1878
1879
Victor Stinnere8d51452010-08-19 01:05:19 +00001880class FSEncodingTests(unittest.TestCase):
1881 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001882 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1883 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001884
Victor Stinnere8d51452010-08-19 01:05:19 +00001885 def test_identity(self):
1886 # assert fsdecode(fsencode(x)) == x
1887 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1888 try:
1889 bytesfn = os.fsencode(fn)
1890 except UnicodeEncodeError:
1891 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001892 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001893
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001894
Brett Cannonefb00c02012-02-29 18:31:31 -05001895
1896class DeviceEncodingTests(unittest.TestCase):
1897
1898 def test_bad_fd(self):
1899 # Return None when an fd doesn't actually exist.
1900 self.assertIsNone(os.device_encoding(123456))
1901
Philip Jenveye308b7c2012-02-29 16:16:15 -08001902 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1903 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001904 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001905 def test_device_encoding(self):
1906 encoding = os.device_encoding(0)
1907 self.assertIsNotNone(encoding)
1908 self.assertTrue(codecs.lookup(encoding))
1909
1910
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001911class PidTests(unittest.TestCase):
1912 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1913 def test_getppid(self):
1914 p = subprocess.Popen([sys.executable, '-c',
1915 'import os; print(os.getppid())'],
1916 stdout=subprocess.PIPE)
1917 stdout, _ = p.communicate()
1918 # We are the parent of our subprocess
1919 self.assertEqual(int(stdout), os.getpid())
1920
1921
Brian Curtin0151b8e2010-09-24 13:43:43 +00001922# The introduction of this TestCase caused at least two different errors on
1923# *nix buildbots. Temporarily skip this to let the buildbots move along.
1924@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001925@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1926class LoginTests(unittest.TestCase):
1927 def test_getlogin(self):
1928 user_name = os.getlogin()
1929 self.assertNotEqual(len(user_name), 0)
1930
1931
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001932@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1933 "needs os.getpriority and os.setpriority")
1934class ProgramPriorityTests(unittest.TestCase):
1935 """Tests for os.getpriority() and os.setpriority()."""
1936
1937 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001938
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001939 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1940 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1941 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001942 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1943 if base >= 19 and new_prio <= 19:
1944 raise unittest.SkipTest(
1945 "unable to reliably test setpriority at current nice level of %s" % base)
1946 else:
1947 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001948 finally:
1949 try:
1950 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1951 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001952 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001953 raise
1954
1955
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001956if threading is not None:
1957 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001958
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001959 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001960
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001961 def __init__(self, conn):
1962 asynchat.async_chat.__init__(self, conn)
1963 self.in_buffer = []
1964 self.closed = False
1965 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001966
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001967 def handle_read(self):
1968 data = self.recv(4096)
1969 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001970
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001971 def get_data(self):
1972 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001973
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001974 def handle_close(self):
1975 self.close()
1976 self.closed = True
1977
1978 def handle_error(self):
1979 raise
1980
1981 def __init__(self, address):
1982 threading.Thread.__init__(self)
1983 asyncore.dispatcher.__init__(self)
1984 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1985 self.bind(address)
1986 self.listen(5)
1987 self.host, self.port = self.socket.getsockname()[:2]
1988 self.handler_instance = None
1989 self._active = False
1990 self._active_lock = threading.Lock()
1991
1992 # --- public API
1993
1994 @property
1995 def running(self):
1996 return self._active
1997
1998 def start(self):
1999 assert not self.running
2000 self.__flag = threading.Event()
2001 threading.Thread.start(self)
2002 self.__flag.wait()
2003
2004 def stop(self):
2005 assert self.running
2006 self._active = False
2007 self.join()
2008
2009 def wait(self):
2010 # wait for handler connection to be closed, then stop the server
2011 while not getattr(self.handler_instance, "closed", False):
2012 time.sleep(0.001)
2013 self.stop()
2014
2015 # --- internals
2016
2017 def run(self):
2018 self._active = True
2019 self.__flag.set()
2020 while self._active and asyncore.socket_map:
2021 self._active_lock.acquire()
2022 asyncore.loop(timeout=0.001, count=1)
2023 self._active_lock.release()
2024 asyncore.close_all()
2025
2026 def handle_accept(self):
2027 conn, addr = self.accept()
2028 self.handler_instance = self.Handler(conn)
2029
2030 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002031 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002032 handle_read = handle_connect
2033
2034 def writable(self):
2035 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002036
2037 def handle_error(self):
2038 raise
2039
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002040
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002041@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002042@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2043class TestSendfile(unittest.TestCase):
2044
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002045 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002046 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002047 not sys.platform.startswith("solaris") and \
2048 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002049 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2050 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002051
2052 @classmethod
2053 def setUpClass(cls):
2054 with open(support.TESTFN, "wb") as f:
2055 f.write(cls.DATA)
2056
2057 @classmethod
2058 def tearDownClass(cls):
2059 support.unlink(support.TESTFN)
2060
2061 def setUp(self):
2062 self.server = SendfileTestServer((support.HOST, 0))
2063 self.server.start()
2064 self.client = socket.socket()
2065 self.client.connect((self.server.host, self.server.port))
2066 self.client.settimeout(1)
2067 # synchronize by waiting for "220 ready" response
2068 self.client.recv(1024)
2069 self.sockno = self.client.fileno()
2070 self.file = open(support.TESTFN, 'rb')
2071 self.fileno = self.file.fileno()
2072
2073 def tearDown(self):
2074 self.file.close()
2075 self.client.close()
2076 if self.server.running:
2077 self.server.stop()
2078
2079 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2080 """A higher level wrapper representing how an application is
2081 supposed to use sendfile().
2082 """
2083 while 1:
2084 try:
2085 if self.SUPPORT_HEADERS_TRAILERS:
2086 return os.sendfile(sock, file, offset, nbytes, headers,
2087 trailers)
2088 else:
2089 return os.sendfile(sock, file, offset, nbytes)
2090 except OSError as err:
2091 if err.errno == errno.ECONNRESET:
2092 # disconnected
2093 raise
2094 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2095 # we have to retry send data
2096 continue
2097 else:
2098 raise
2099
2100 def test_send_whole_file(self):
2101 # normal send
2102 total_sent = 0
2103 offset = 0
2104 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002105 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002106 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2107 if sent == 0:
2108 break
2109 offset += sent
2110 total_sent += sent
2111 self.assertTrue(sent <= nbytes)
2112 self.assertEqual(offset, total_sent)
2113
2114 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002115 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002116 self.client.close()
2117 self.server.wait()
2118 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002119 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002120 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002121
2122 def test_send_at_certain_offset(self):
2123 # start sending a file at a certain offset
2124 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002125 offset = len(self.DATA) // 2
2126 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002127 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002128 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002129 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2130 if sent == 0:
2131 break
2132 offset += sent
2133 total_sent += sent
2134 self.assertTrue(sent <= nbytes)
2135
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002136 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002137 self.client.close()
2138 self.server.wait()
2139 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002140 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002141 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002142 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002143 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002144
2145 def test_offset_overflow(self):
2146 # specify an offset > file size
2147 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002148 try:
2149 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2150 except OSError as e:
2151 # Solaris can raise EINVAL if offset >= file length, ignore.
2152 if e.errno != errno.EINVAL:
2153 raise
2154 else:
2155 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002156 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002157 self.client.close()
2158 self.server.wait()
2159 data = self.server.handler_instance.get_data()
2160 self.assertEqual(data, b'')
2161
2162 def test_invalid_offset(self):
2163 with self.assertRaises(OSError) as cm:
2164 os.sendfile(self.sockno, self.fileno, -1, 4096)
2165 self.assertEqual(cm.exception.errno, errno.EINVAL)
2166
Martin Panterbf19d162015-09-09 01:01:13 +00002167 def test_keywords(self):
2168 # Keyword arguments should be supported
2169 os.sendfile(out=self.sockno, offset=0, count=4096,
2170 **{'in': self.fileno})
2171 if self.SUPPORT_HEADERS_TRAILERS:
2172 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002173 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002174
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002175 # --- headers / trailers tests
2176
Serhiy Storchaka43767632013-11-03 21:31:38 +02002177 @requires_headers_trailers
2178 def test_headers(self):
2179 total_sent = 0
2180 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2181 headers=[b"x" * 512])
2182 total_sent += sent
2183 offset = 4096
2184 nbytes = 4096
2185 while 1:
2186 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2187 offset, nbytes)
2188 if sent == 0:
2189 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002190 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002191 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002192
Serhiy Storchaka43767632013-11-03 21:31:38 +02002193 expected_data = b"x" * 512 + self.DATA
2194 self.assertEqual(total_sent, len(expected_data))
2195 self.client.close()
2196 self.server.wait()
2197 data = self.server.handler_instance.get_data()
2198 self.assertEqual(hash(data), hash(expected_data))
2199
2200 @requires_headers_trailers
2201 def test_trailers(self):
2202 TESTFN2 = support.TESTFN + "2"
2203 file_data = b"abcdef"
2204 with open(TESTFN2, 'wb') as f:
2205 f.write(file_data)
2206 with open(TESTFN2, 'rb')as f:
2207 self.addCleanup(os.remove, TESTFN2)
2208 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2209 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002210 self.client.close()
2211 self.server.wait()
2212 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002213 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002214
Serhiy Storchaka43767632013-11-03 21:31:38 +02002215 @requires_headers_trailers
2216 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2217 'test needs os.SF_NODISKIO')
2218 def test_flags(self):
2219 try:
2220 os.sendfile(self.sockno, self.fileno, 0, 4096,
2221 flags=os.SF_NODISKIO)
2222 except OSError as err:
2223 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2224 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002225
2226
Larry Hastings9cf065c2012-06-22 16:30:09 -07002227def supports_extended_attributes():
2228 if not hasattr(os, "setxattr"):
2229 return False
2230 try:
2231 with open(support.TESTFN, "wb") as fp:
2232 try:
2233 os.setxattr(fp.fileno(), b"user.test", b"")
2234 except OSError:
2235 return False
2236 finally:
2237 support.unlink(support.TESTFN)
2238 # Kernels < 2.6.39 don't respect setxattr flags.
2239 kernel_version = platform.release()
2240 m = re.match("2.6.(\d{1,2})", kernel_version)
2241 return m is None or int(m.group(1)) >= 39
2242
2243
2244@unittest.skipUnless(supports_extended_attributes(),
2245 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002246class ExtendedAttributeTests(unittest.TestCase):
2247
2248 def tearDown(self):
2249 support.unlink(support.TESTFN)
2250
Larry Hastings9cf065c2012-06-22 16:30:09 -07002251 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002252 fn = support.TESTFN
2253 open(fn, "wb").close()
2254 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002255 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002256 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002257 init_xattr = listxattr(fn)
2258 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002259 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002260 xattr = set(init_xattr)
2261 xattr.add("user.test")
2262 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002263 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2264 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2265 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002266 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002267 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002268 self.assertEqual(cm.exception.errno, errno.EEXIST)
2269 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002270 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002271 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002272 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002273 xattr.add("user.test2")
2274 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002275 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002276 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002277 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002278 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002279 xattr.remove("user.test")
2280 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002281 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2282 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2283 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2284 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002285 many = sorted("user.test{}".format(i) for i in range(100))
2286 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002287 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002288 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002289
Larry Hastings9cf065c2012-06-22 16:30:09 -07002290 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002291 def make_bytes(s):
2292 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002293 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002294 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002295 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002296
2297 def test_simple(self):
2298 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2299 os.listxattr)
2300
2301 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002302 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2303 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002304
2305 def test_fds(self):
2306 def getxattr(path, *args):
2307 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002308 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002309 def setxattr(path, *args):
2310 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002311 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002312 def removexattr(path, *args):
2313 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002314 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002315 def listxattr(path, *args):
2316 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002317 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002318 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2319
2320
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002321@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2322class Win32DeprecatedBytesAPI(unittest.TestCase):
2323 def test_deprecated(self):
2324 import nt
2325 filename = os.fsencode(support.TESTFN)
2326 with warnings.catch_warnings():
2327 warnings.simplefilter("error", DeprecationWarning)
2328 for func, *args in (
2329 (nt._getfullpathname, filename),
2330 (nt._isdir, filename),
2331 (os.access, filename, os.R_OK),
2332 (os.chdir, filename),
2333 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002334 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002335 (os.link, filename, filename),
2336 (os.listdir, filename),
2337 (os.lstat, filename),
2338 (os.mkdir, filename),
2339 (os.open, filename, os.O_RDONLY),
2340 (os.rename, filename, filename),
2341 (os.rmdir, filename),
2342 (os.startfile, filename),
2343 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002344 (os.unlink, filename),
2345 (os.utime, filename),
2346 ):
2347 self.assertRaises(DeprecationWarning, func, *args)
2348
Victor Stinner28216442011-11-16 00:34:44 +01002349 @support.skip_unless_symlink
2350 def test_symlink(self):
2351 filename = os.fsencode(support.TESTFN)
2352 with warnings.catch_warnings():
2353 warnings.simplefilter("error", DeprecationWarning)
2354 self.assertRaises(DeprecationWarning,
2355 os.symlink, filename, filename)
2356
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002357
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002358@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2359class TermsizeTests(unittest.TestCase):
2360 def test_does_not_crash(self):
2361 """Check if get_terminal_size() returns a meaningful value.
2362
2363 There's no easy portable way to actually check the size of the
2364 terminal, so let's check if it returns something sensible instead.
2365 """
2366 try:
2367 size = os.get_terminal_size()
2368 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002369 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002370 # Under win32 a generic OSError can be thrown if the
2371 # handle cannot be retrieved
2372 self.skipTest("failed to query terminal size")
2373 raise
2374
Antoine Pitroucfade362012-02-08 23:48:59 +01002375 self.assertGreaterEqual(size.columns, 0)
2376 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002377
2378 def test_stty_match(self):
2379 """Check if stty returns the same results
2380
2381 stty actually tests stdin, so get_terminal_size is invoked on
2382 stdin explicitly. If stty succeeded, then get_terminal_size()
2383 should work too.
2384 """
2385 try:
2386 size = subprocess.check_output(['stty', 'size']).decode().split()
2387 except (FileNotFoundError, subprocess.CalledProcessError):
2388 self.skipTest("stty invocation failed")
2389 expected = (int(size[1]), int(size[0])) # reversed order
2390
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002391 try:
2392 actual = os.get_terminal_size(sys.__stdin__.fileno())
2393 except OSError as e:
2394 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2395 # Under win32 a generic OSError can be thrown if the
2396 # handle cannot be retrieved
2397 self.skipTest("failed to query terminal size")
2398 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002399 self.assertEqual(expected, actual)
2400
2401
Victor Stinner292c8352012-10-30 02:17:38 +01002402class OSErrorTests(unittest.TestCase):
2403 def setUp(self):
2404 class Str(str):
2405 pass
2406
Victor Stinnerafe17062012-10-31 22:47:43 +01002407 self.bytes_filenames = []
2408 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002409 if support.TESTFN_UNENCODABLE is not None:
2410 decoded = support.TESTFN_UNENCODABLE
2411 else:
2412 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002413 self.unicode_filenames.append(decoded)
2414 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002415 if support.TESTFN_UNDECODABLE is not None:
2416 encoded = support.TESTFN_UNDECODABLE
2417 else:
2418 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002419 self.bytes_filenames.append(encoded)
2420 self.bytes_filenames.append(memoryview(encoded))
2421
2422 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002423
2424 def test_oserror_filename(self):
2425 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002426 (self.filenames, os.chdir,),
2427 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002428 (self.filenames, os.lstat,),
2429 (self.filenames, os.open, os.O_RDONLY),
2430 (self.filenames, os.rmdir,),
2431 (self.filenames, os.stat,),
2432 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002433 ]
2434 if sys.platform == "win32":
2435 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002436 (self.bytes_filenames, os.rename, b"dst"),
2437 (self.bytes_filenames, os.replace, b"dst"),
2438 (self.unicode_filenames, os.rename, "dst"),
2439 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002440 # Issue #16414: Don't test undecodable names with listdir()
2441 # because of a Windows bug.
2442 #
2443 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2444 # empty list (instead of failing), whereas os.listdir(b'\xff')
2445 # raises a FileNotFoundError. It looks like a Windows bug:
2446 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2447 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2448 # ERROR_PATH_NOT_FOUND (3).
2449 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002450 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002451 else:
2452 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002453 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002454 (self.filenames, os.rename, "dst"),
2455 (self.filenames, os.replace, "dst"),
2456 ))
2457 if hasattr(os, "chown"):
2458 funcs.append((self.filenames, os.chown, 0, 0))
2459 if hasattr(os, "lchown"):
2460 funcs.append((self.filenames, os.lchown, 0, 0))
2461 if hasattr(os, "truncate"):
2462 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002463 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002464 funcs.append((self.filenames, os.chflags, 0))
2465 if hasattr(os, "lchflags"):
2466 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002467 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002468 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002469 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002470 if sys.platform == "win32":
2471 funcs.append((self.bytes_filenames, os.link, b"dst"))
2472 funcs.append((self.unicode_filenames, os.link, "dst"))
2473 else:
2474 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002475 if hasattr(os, "listxattr"):
2476 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002477 (self.filenames, os.listxattr,),
2478 (self.filenames, os.getxattr, "user.test"),
2479 (self.filenames, os.setxattr, "user.test", b'user'),
2480 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002481 ))
2482 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002483 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002484 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002485 if sys.platform == "win32":
2486 funcs.append((self.unicode_filenames, os.readlink,))
2487 else:
2488 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002489
Victor Stinnerafe17062012-10-31 22:47:43 +01002490 for filenames, func, *func_args in funcs:
2491 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002492 try:
2493 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002494 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002495 self.assertIs(err.filename, name)
2496 else:
2497 self.fail("No exception thrown by {}".format(func))
2498
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002499class CPUCountTests(unittest.TestCase):
2500 def test_cpu_count(self):
2501 cpus = os.cpu_count()
2502 if cpus is not None:
2503 self.assertIsInstance(cpus, int)
2504 self.assertGreater(cpus, 0)
2505 else:
2506 self.skipTest("Could not determine the number of CPUs")
2507
Victor Stinnerdaf45552013-08-28 00:53:59 +02002508
2509class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002510 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002511 fd = os.open(__file__, os.O_RDONLY)
2512 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002513 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002514
Victor Stinnerdaf45552013-08-28 00:53:59 +02002515 os.set_inheritable(fd, True)
2516 self.assertEqual(os.get_inheritable(fd), True)
2517
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002518 @unittest.skipIf(fcntl is None, "need fcntl")
2519 def test_get_inheritable_cloexec(self):
2520 fd = os.open(__file__, os.O_RDONLY)
2521 self.addCleanup(os.close, fd)
2522 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002523
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002524 # clear FD_CLOEXEC flag
2525 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2526 flags &= ~fcntl.FD_CLOEXEC
2527 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002528
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002529 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002530
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002531 @unittest.skipIf(fcntl is None, "need fcntl")
2532 def test_set_inheritable_cloexec(self):
2533 fd = os.open(__file__, os.O_RDONLY)
2534 self.addCleanup(os.close, fd)
2535 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2536 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002537
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002538 os.set_inheritable(fd, True)
2539 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2540 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002541
Victor Stinnerdaf45552013-08-28 00:53:59 +02002542 def test_open(self):
2543 fd = os.open(__file__, os.O_RDONLY)
2544 self.addCleanup(os.close, fd)
2545 self.assertEqual(os.get_inheritable(fd), False)
2546
2547 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2548 def test_pipe(self):
2549 rfd, wfd = os.pipe()
2550 self.addCleanup(os.close, rfd)
2551 self.addCleanup(os.close, wfd)
2552 self.assertEqual(os.get_inheritable(rfd), False)
2553 self.assertEqual(os.get_inheritable(wfd), False)
2554
2555 def test_dup(self):
2556 fd1 = os.open(__file__, os.O_RDONLY)
2557 self.addCleanup(os.close, fd1)
2558
2559 fd2 = os.dup(fd1)
2560 self.addCleanup(os.close, fd2)
2561 self.assertEqual(os.get_inheritable(fd2), False)
2562
2563 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2564 def test_dup2(self):
2565 fd = os.open(__file__, os.O_RDONLY)
2566 self.addCleanup(os.close, fd)
2567
2568 # inheritable by default
2569 fd2 = os.open(__file__, os.O_RDONLY)
2570 try:
2571 os.dup2(fd, fd2)
2572 self.assertEqual(os.get_inheritable(fd2), True)
2573 finally:
2574 os.close(fd2)
2575
2576 # force non-inheritable
2577 fd3 = os.open(__file__, os.O_RDONLY)
2578 try:
2579 os.dup2(fd, fd3, inheritable=False)
2580 self.assertEqual(os.get_inheritable(fd3), False)
2581 finally:
2582 os.close(fd3)
2583
2584 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2585 def test_openpty(self):
2586 master_fd, slave_fd = os.openpty()
2587 self.addCleanup(os.close, master_fd)
2588 self.addCleanup(os.close, slave_fd)
2589 self.assertEqual(os.get_inheritable(master_fd), False)
2590 self.assertEqual(os.get_inheritable(slave_fd), False)
2591
2592
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002593@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002594def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002595 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002596 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002597 StatAttributeTests,
Victor Stinnere12e7aa2015-06-12 21:58:00 +02002598 UtimeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002599 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002600 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002601 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002602 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002603 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002604 URandomTests,
Ned Deilyab6b9f82015-03-17 04:30:08 -07002605 URandomFDTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002606 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002607 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002608 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002609 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002610 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002611 Win32KillTests,
Tim Golden781bbeb2013-10-25 20:24:06 +01002612 Win32ListdirTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002613 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002614 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002615 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002616 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002617 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002618 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002619 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002620 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002621 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002622 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002623 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002624 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002625 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002626 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002627 CPUCountTests,
Victor Stinnerdaf45552013-08-28 00:53:59 +02002628 FDInheritanceTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002629 )
Fred Drake2e2be372001-09-20 21:33:42 +00002630
2631if __name__ == "__main__":
2632 test_main()