blob: 31f2cc36e6a98c0ef3d59c1dcb0b2b59975f3a08 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070027import decimal
28import fractions
Christian Heimes25827622013-10-12 01:27:08 +020029import pickle
Victor Stinnerfe02e392014-12-21 01:16:38 +010030import sysconfig
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020043
Georg Brandl2daf6ae2012-02-20 19:54:16 +010044from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000045
Victor Stinner034d0aa2012-06-05 01:22:15 +020046with warnings.catch_warnings():
47 warnings.simplefilter("ignore", DeprecationWarning)
48 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010049st = os.stat(__file__)
50stat_supports_subsecond = (
51 # check if float and int timestamps are different
52 (st.st_atime != st[7])
53 or (st.st_mtime != st[8])
54 or (st.st_ctime != st[9]))
55
Mark Dickinson7cf03892010-04-16 13:45:35 +000056# Detect whether we're on a Linux system that uses the (now outdated
57# and unmaintained) linuxthreads threading library. There's an issue
58# when combining linuxthreads with a failed execv call: see
59# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020060if hasattr(sys, 'thread_info') and sys.thread_info.version:
61 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
62else:
63 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000064
Stefan Krahebee49a2013-01-17 15:31:00 +010065# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
66HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
67
Thomas Wouters0e3f5912006-08-11 14:57:12 +000068# Tests creating TESTFN
69class FileTests(unittest.TestCase):
70 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000071 if os.path.exists(support.TESTFN):
72 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000073 tearDown = setUp
74
75 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000076 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000077 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000078 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000079
Christian Heimesfdab48e2008-01-20 09:06:41 +000080 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000081 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
82 # We must allocate two consecutive file descriptors, otherwise
83 # it will mess up other file descriptors (perhaps even the three
84 # standard ones).
85 second = os.dup(first)
86 try:
87 retries = 0
88 while second != first + 1:
89 os.close(first)
90 retries += 1
91 if retries > 10:
92 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000093 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000094 first, second = second, os.dup(second)
95 finally:
96 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000097 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000098 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000100
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000101 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000102 def test_rename(self):
103 path = support.TESTFN
104 old = sys.getrefcount(path)
105 self.assertRaises(TypeError, os.rename, path, 0)
106 new = sys.getrefcount(path)
107 self.assertEqual(old, new)
108
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000109 def test_read(self):
110 with open(support.TESTFN, "w+b") as fobj:
111 fobj.write(b"spam")
112 fobj.flush()
113 fd = fobj.fileno()
114 os.lseek(fd, 0, 0)
115 s = os.read(fd, 4)
116 self.assertEqual(type(s), bytes)
117 self.assertEqual(s, b"spam")
118
119 def test_write(self):
120 # os.write() accepts bytes- and buffer-like objects but not strings
121 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
122 self.assertRaises(TypeError, os.write, fd, "beans")
123 os.write(fd, b"bacon\n")
124 os.write(fd, bytearray(b"eggs\n"))
125 os.write(fd, memoryview(b"spam\n"))
126 os.close(fd)
127 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000128 self.assertEqual(fobj.read().splitlines(),
129 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000130
Victor Stinnere0daff12011-03-20 23:36:35 +0100131 def write_windows_console(self, *args):
132 retcode = subprocess.call(args,
133 # use a new console to not flood the test output
134 creationflags=subprocess.CREATE_NEW_CONSOLE,
135 # use a shell to hide the console window (SW_HIDE)
136 shell=True)
137 self.assertEqual(retcode, 0)
138
139 @unittest.skipUnless(sys.platform == 'win32',
140 'test specific to the Windows console')
141 def test_write_windows_console(self):
142 # Issue #11395: the Windows console returns an error (12: not enough
143 # space error) on writing into stdout if stdout mode is binary and the
144 # length is greater than 66,000 bytes (or less, depending on heap
145 # usage).
146 code = "print('x' * 100000)"
147 self.write_windows_console(sys.executable, "-c", code)
148 self.write_windows_console(sys.executable, "-u", "-c", code)
149
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000150 def fdopen_helper(self, *args):
151 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200152 f = os.fdopen(fd, *args)
153 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000154
155 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200156 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
157 os.close(fd)
158
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000159 self.fdopen_helper()
160 self.fdopen_helper('r')
161 self.fdopen_helper('r', 100)
162
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100163 def test_replace(self):
164 TESTFN2 = support.TESTFN + ".2"
165 with open(support.TESTFN, 'w') as f:
166 f.write("1")
167 with open(TESTFN2, 'w') as f:
168 f.write("2")
169 self.addCleanup(os.unlink, TESTFN2)
170 os.replace(support.TESTFN, TESTFN2)
171 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
172 with open(TESTFN2, 'r') as f:
173 self.assertEqual(f.read(), "1")
174
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200175
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000176# Test attributes on return values from os.*stat* family.
177class StatAttributeTests(unittest.TestCase):
178 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000179 os.mkdir(support.TESTFN)
180 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000182 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000183 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000184
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000185 def tearDown(self):
186 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000187 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000188
Serhiy Storchaka43767632013-11-03 21:31:38 +0200189 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000190 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000191 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000192
193 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000194 self.assertEqual(result[stat.ST_SIZE], 3)
195 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000196
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000197 # Make sure all the attributes are there
198 members = dir(result)
199 for name in dir(stat):
200 if name[:3] == 'ST_':
201 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000202 if name.endswith("TIME"):
203 def trunc(x): return int(x)
204 else:
205 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000206 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000207 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000208 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000209
Larry Hastings6fe20b32012-04-19 15:07:49 -0700210 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700211 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700212 for name in 'st_atime st_mtime st_ctime'.split():
213 floaty = int(getattr(result, name) * 100000)
214 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700215 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700216
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000217 try:
218 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200219 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 except IndexError:
221 pass
222
223 # Make sure that assignment fails
224 try:
225 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200226 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000227 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000228 pass
229
230 try:
231 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200232 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000233 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234 pass
235
236 try:
237 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200238 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000239 except AttributeError:
240 pass
241
242 # Use the stat_result constructor with a too-short tuple.
243 try:
244 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200245 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246 except TypeError:
247 pass
248
Ezio Melotti42da6632011-03-15 05:18:48 +0200249 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000250 try:
251 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
252 except TypeError:
253 pass
254
Antoine Pitrou38425292010-09-21 18:19:07 +0000255 def test_stat_attributes(self):
256 self.check_stat_attributes(self.fname)
257
258 def test_stat_attributes_bytes(self):
259 try:
260 fname = self.fname.encode(sys.getfilesystemencoding())
261 except UnicodeEncodeError:
262 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100263 with warnings.catch_warnings():
264 warnings.simplefilter("ignore", DeprecationWarning)
265 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000266
Christian Heimes25827622013-10-12 01:27:08 +0200267 def test_stat_result_pickle(self):
268 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200269 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
270 p = pickle.dumps(result, proto)
271 self.assertIn(b'stat_result', p)
272 if proto < 4:
273 self.assertIn(b'cos\nstat_result\n', p)
274 unpickled = pickle.loads(p)
275 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200276
Serhiy Storchaka43767632013-11-03 21:31:38 +0200277 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000279 try:
280 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000281 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000282 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000283 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200284 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000285
286 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000287 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000288
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000289 # Make sure all the attributes are there.
290 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
291 'ffree', 'favail', 'flag', 'namemax')
292 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000293 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000294
295 # Make sure that assignment really fails
296 try:
297 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200298 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000299 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000300 pass
301
302 try:
303 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200304 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000305 except AttributeError:
306 pass
307
308 # Use the constructor with a too-short tuple.
309 try:
310 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200311 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000312 except TypeError:
313 pass
314
Ezio Melotti42da6632011-03-15 05:18:48 +0200315 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000316 try:
317 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
318 except TypeError:
319 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000320
Christian Heimes25827622013-10-12 01:27:08 +0200321 @unittest.skipUnless(hasattr(os, 'statvfs'),
322 "need os.statvfs()")
323 def test_statvfs_result_pickle(self):
324 try:
325 result = os.statvfs(self.fname)
326 except OSError as e:
327 # On AtheOS, glibc always returns ENOSYS
328 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200329 self.skipTest('os.statvfs() failed with ENOSYS')
330
Serhiy Storchakabad12572014-12-15 14:03:42 +0200331 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
332 p = pickle.dumps(result, proto)
333 self.assertIn(b'statvfs_result', p)
334 if proto < 4:
335 self.assertIn(b'cos\nstatvfs_result\n', p)
336 unpickled = pickle.loads(p)
337 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200338
Thomas Wouters89f507f2006-12-13 04:49:30 +0000339 def test_utime_dir(self):
340 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000341 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000342 # round to int, because some systems may support sub-second
343 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000344 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
345 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000346 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000347
Larry Hastings76ad59b2012-05-03 00:30:07 -0700348 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600349 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600350 # second argument. Check that the previous methods of passing
351 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700352 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600353 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700354 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
355 # Setting the time to the time you just read, then reading again,
356 # should always return exactly the same times.
357 st1 = os.stat(filename)
358 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
359 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600360 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700361 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600362 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700363 # Set to the current time in the new way
364 os.utime(filename)
365 st3 = os.stat(filename)
366 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
367
368 def test_utime(self):
369 def utime(file, times):
370 return os.utime(file, times)
371 self._test_utime(self.fname, getattr, utime, 10)
372 self._test_utime(support.TESTFN, getattr, utime, 10)
373
374
375 def _test_utime_ns(self, set_times_ns, test_dir=True):
376 def getattr_ns(o, attr):
377 return getattr(o, attr + "_ns")
378 ten_s = 10 * 1000 * 1000 * 1000
379 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
380 if test_dir:
381 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
382
383 def test_utime_ns(self):
384 def utime_ns(file, times):
385 return os.utime(file, ns=times)
386 self._test_utime_ns(utime_ns)
387
Larry Hastings9cf065c2012-06-22 16:30:09 -0700388 requires_utime_dir_fd = unittest.skipUnless(
389 os.utime in os.supports_dir_fd,
390 "dir_fd support for utime required for this test.")
391 requires_utime_fd = unittest.skipUnless(
392 os.utime in os.supports_fd,
393 "fd support for utime required for this test.")
394 requires_utime_nofollow_symlinks = unittest.skipUnless(
395 os.utime in os.supports_follow_symlinks,
396 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700397
Larry Hastings9cf065c2012-06-22 16:30:09 -0700398 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700399 def test_lutimes_ns(self):
400 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700401 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 self._test_utime_ns(lutimes_ns)
403
Larry Hastings9cf065c2012-06-22 16:30:09 -0700404 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700405 def test_futimes_ns(self):
406 def futimes_ns(file, times):
407 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700408 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700409 self._test_utime_ns(futimes_ns, test_dir=False)
410
411 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700412 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700413 getattr(os, name)(arg, (5, 5), ns=(5, 5))
414
415 def test_utime_invalid_arguments(self):
416 self._utime_invalid_arguments('utime', self.fname)
417
Brian Curtin52fbea12011-11-06 13:41:17 -0600418
Victor Stinner1aa54a42012-02-08 04:09:37 +0100419 @unittest.skipUnless(stat_supports_subsecond,
420 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100421 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100422 asec, amsec = 1, 901
423 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100424 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100425 mtime = msec + mmsec * 1e-3
426 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100427 os.utime(filename, (0, 0))
428 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200429 with warnings.catch_warnings():
430 warnings.simplefilter("ignore", DeprecationWarning)
431 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100432 st = os.stat(filename)
433 self.assertAlmostEqual(st.st_atime, atime, places=3)
434 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100435
Victor Stinnera2f7c002012-02-08 03:36:25 +0100436 def test_utime_subsecond(self):
437 def set_time(filename, atime, mtime):
438 os.utime(filename, (atime, mtime))
439 self._test_utime_subsecond(set_time)
440
Larry Hastings9cf065c2012-06-22 16:30:09 -0700441 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100442 def test_futimes_subsecond(self):
443 def set_time(filename, atime, mtime):
444 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700445 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100446 self._test_utime_subsecond(set_time)
447
Larry Hastings9cf065c2012-06-22 16:30:09 -0700448 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100449 def test_futimens_subsecond(self):
450 def set_time(filename, atime, mtime):
451 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700452 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100453 self._test_utime_subsecond(set_time)
454
Larry Hastings9cf065c2012-06-22 16:30:09 -0700455 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100456 def test_futimesat_subsecond(self):
457 def set_time(filename, atime, mtime):
458 dirname = os.path.dirname(filename)
459 dirfd = os.open(dirname, os.O_RDONLY)
460 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700461 os.utime(os.path.basename(filename), dir_fd=dirfd,
462 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100463 finally:
464 os.close(dirfd)
465 self._test_utime_subsecond(set_time)
466
Larry Hastings9cf065c2012-06-22 16:30:09 -0700467 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100468 def test_lutimes_subsecond(self):
469 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700470 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100471 self._test_utime_subsecond(set_time)
472
Larry Hastings9cf065c2012-06-22 16:30:09 -0700473 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100474 def test_utimensat_subsecond(self):
475 def set_time(filename, atime, mtime):
476 dirname = os.path.dirname(filename)
477 dirfd = os.open(dirname, os.O_RDONLY)
478 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700479 os.utime(os.path.basename(filename), dir_fd=dirfd,
480 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100481 finally:
482 os.close(dirfd)
483 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100484
Serhiy Storchaka43767632013-11-03 21:31:38 +0200485 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000486 # systems support centiseconds
Serhiy Storchaka43767632013-11-03 21:31:38 +0200487 def get_file_system(path):
488 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000489 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000490 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000491 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000492 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000493 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000494 return buf.value
495
Serhiy Storchaka43767632013-11-03 21:31:38 +0200496 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
497 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
498 "requires NTFS")
499 def test_1565150(self):
500 t1 = 1159195039.25
501 os.utime(self.fname, (t1, t1))
502 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000503
Serhiy Storchaka43767632013-11-03 21:31:38 +0200504 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
505 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
506 "requires NTFS")
507 def test_large_time(self):
508 t1 = 5000000000 # some day in 2128
509 os.utime(self.fname, (t1, t1))
510 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000511
Serhiy Storchaka43767632013-11-03 21:31:38 +0200512 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
513 def test_1686475(self):
514 # Verify that an open file can be stat'ed
515 try:
516 os.stat(r"c:\pagefile.sys")
517 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600518 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200519 except OSError as e:
520 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000521
Serhiy Storchaka43767632013-11-03 21:31:38 +0200522 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
523 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
524 def test_15261(self):
525 # Verify that stat'ing a closed fd does not cause crash
526 r, w = os.pipe()
527 try:
528 os.stat(r) # should not raise error
529 finally:
530 os.close(r)
531 os.close(w)
532 with self.assertRaises(OSError) as ctx:
533 os.stat(r)
534 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100535
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000536from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000537
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000538class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000539 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000540 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000541
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000542 def setUp(self):
543 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000544 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000545 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000546 for key, value in self._reference().items():
547 os.environ[key] = value
548
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000549 def tearDown(self):
550 os.environ.clear()
551 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000552 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000553 os.environb.clear()
554 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000555
Christian Heimes90333392007-11-01 19:08:42 +0000556 def _reference(self):
557 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
558
559 def _empty_mapping(self):
560 os.environ.clear()
561 return os.environ
562
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000563 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300564 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000565 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000566 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300567 os.environ.update(HELLO="World")
568 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
569 value = popen.read().strip()
570 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000571
Ezio Melottic7e139b2012-09-26 20:01:34 +0300572 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000573 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300574 with os.popen(
575 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
576 it = iter(popen)
577 self.assertEqual(next(it), "line1\n")
578 self.assertEqual(next(it), "line2\n")
579 self.assertEqual(next(it), "line3\n")
580 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000581
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000582 # Verify environ keys and values from the OS are of the
583 # correct str type.
584 def test_keyvalue_types(self):
585 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000586 self.assertEqual(type(key), str)
587 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000588
Christian Heimes90333392007-11-01 19:08:42 +0000589 def test_items(self):
590 for key, value in self._reference().items():
591 self.assertEqual(os.environ.get(key), value)
592
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000593 # Issue 7310
594 def test___repr__(self):
595 """Check that the repr() of os.environ looks like environ({...})."""
596 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000597 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
598 '{!r}: {!r}'.format(key, value)
599 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000600
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000601 def test_get_exec_path(self):
602 defpath_list = os.defpath.split(os.pathsep)
603 test_path = ['/monty', '/python', '', '/flying/circus']
604 test_env = {'PATH': os.pathsep.join(test_path)}
605
606 saved_environ = os.environ
607 try:
608 os.environ = dict(test_env)
609 # Test that defaulting to os.environ works.
610 self.assertSequenceEqual(test_path, os.get_exec_path())
611 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
612 finally:
613 os.environ = saved_environ
614
615 # No PATH environment variable
616 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
617 # Empty PATH environment variable
618 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
619 # Supplied PATH environment variable
620 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
621
Victor Stinnerb745a742010-05-18 17:17:23 +0000622 if os.supports_bytes_environ:
623 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000624 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000625 # ignore BytesWarning warning
626 with warnings.catch_warnings(record=True):
627 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000628 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000629 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000630 pass
631 else:
632 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000633
634 # bytes key and/or value
635 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
636 ['abc'])
637 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
638 ['abc'])
639 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
640 ['abc'])
641
642 @unittest.skipUnless(os.supports_bytes_environ,
643 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000644 def test_environb(self):
645 # os.environ -> os.environb
646 value = 'euro\u20ac'
647 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000648 value_bytes = value.encode(sys.getfilesystemencoding(),
649 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000650 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000651 msg = "U+20AC character is not encodable to %s" % (
652 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000653 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000654 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000655 self.assertEqual(os.environ['unicode'], value)
656 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000657
658 # os.environb -> os.environ
659 value = b'\xff'
660 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000661 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000662 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000663 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000664
Charles-François Natali2966f102011-11-26 11:32:46 +0100665 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
666 # #13415).
667 @support.requires_freebsd_version(7)
668 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100669 def test_unset_error(self):
670 if sys.platform == "win32":
671 # an environment variable is limited to 32,767 characters
672 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100673 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100674 else:
675 # "=" is not allowed in a variable name
676 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100677 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100678
Victor Stinner6d101392013-04-14 16:35:04 +0200679 def test_key_type(self):
680 missing = 'missingkey'
681 self.assertNotIn(missing, os.environ)
682
Victor Stinner839e5ea2013-04-14 16:43:03 +0200683 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200684 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200685 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200686 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200687
Victor Stinner839e5ea2013-04-14 16:43:03 +0200688 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200689 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200690 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200691 self.assertTrue(cm.exception.__suppress_context__)
692
Victor Stinner6d101392013-04-14 16:35:04 +0200693
Tim Petersc4e09402003-04-25 07:11:48 +0000694class WalkTests(unittest.TestCase):
695 """Tests for os.walk()."""
696
Victor Stinner0561c532015-03-12 10:28:24 +0100697 # Wrapper to hide minor differences between os.walk and os.fwalk
698 # to tests both functions with the same code base
699 def walk(self, directory, topdown=True, follow_symlinks=False):
700 walk_it = os.walk(directory,
701 topdown=topdown,
702 followlinks=follow_symlinks)
703 for root, dirs, files in walk_it:
704 yield (root, dirs, files)
705
Charles-François Natali7372b062012-02-05 15:15:38 +0100706 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100707 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000708
709 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000710 # TESTFN/
711 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000712 # tmp1
713 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000714 # tmp2
715 # SUB11/ no kids
716 # SUB2/ a file kid and a dirsymlink kid
717 # tmp3
718 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200719 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000720 # TEST2/
721 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100722 self.walk_path = join(support.TESTFN, "TEST1")
723 self.sub1_path = join(self.walk_path, "SUB1")
724 self.sub11_path = join(self.sub1_path, "SUB11")
725 sub2_path = join(self.walk_path, "SUB2")
726 tmp1_path = join(self.walk_path, "tmp1")
727 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000728 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100729 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000730 t2_path = join(support.TESTFN, "TEST2")
731 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200732 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000733
734 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100735 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000736 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000737 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100738
Guido van Rossumd8faa362007-04-27 19:54:29 +0000739 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000740 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000741 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
742 f.close()
743
Victor Stinner0561c532015-03-12 10:28:24 +0100744 if support.can_symlink():
745 os.symlink(os.path.abspath(t2_path), self.link_path)
746 os.symlink('broken', broken_link_path, True)
747 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
748 else:
749 self.sub2_tree = (sub2_path, [], ["tmp3"])
750
751 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000752 # Walk top-down.
Victor Stinner0561c532015-03-12 10:28:24 +0100753 all = list(os.walk(self.walk_path))
754
Tim Petersc4e09402003-04-25 07:11:48 +0000755 self.assertEqual(len(all), 4)
756 # We can't know which order SUB1 and SUB2 will appear in.
757 # Not flipped: TESTFN, SUB1, SUB11, SUB2
758 # flipped: TESTFN, SUB2, SUB1, SUB11
759 flipped = all[0][1][0] != "SUB1"
760 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200761 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100762 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
763 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
764 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
765 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000766
Victor Stinner0561c532015-03-12 10:28:24 +0100767 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000768 # Prune the search.
769 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100770 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000771 all.append((root, dirs, files))
772 # Don't descend into SUB1.
773 if 'SUB1' in dirs:
774 # Note that this also mutates the dirs we appended to all!
775 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000776
Victor Stinner0561c532015-03-12 10:28:24 +0100777 self.assertEqual(len(all), 2)
778 self.assertEqual(all[0],
779 (self.walk_path, ["SUB2"], ["tmp1"]))
780
781 all[1][-1].sort()
782 self.assertEqual(all[1], self.sub2_tree)
783
784 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000785 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100786 all = list(self.walk(self.walk_path, topdown=False))
787
Tim Petersc4e09402003-04-25 07:11:48 +0000788 self.assertEqual(len(all), 4)
789 # We can't know which order SUB1 and SUB2 will appear in.
790 # Not flipped: SUB11, SUB1, SUB2, TESTFN
791 # flipped: SUB2, SUB11, SUB1, TESTFN
792 flipped = all[3][1][0] != "SUB1"
793 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200794 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100795 self.assertEqual(all[3],
796 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
797 self.assertEqual(all[flipped],
798 (self.sub11_path, [], []))
799 self.assertEqual(all[flipped + 1],
800 (self.sub1_path, ["SUB11"], ["tmp2"]))
801 self.assertEqual(all[2 - 2 * flipped],
802 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000803
Victor Stinner0561c532015-03-12 10:28:24 +0100804 def test_walk_symlink(self):
805 if not support.can_symlink():
806 self.skipTest("need symlink support")
807
808 # Walk, following symlinks.
809 walk_it = self.walk(self.walk_path, follow_symlinks=True)
810 for root, dirs, files in walk_it:
811 if root == self.link_path:
812 self.assertEqual(dirs, [])
813 self.assertEqual(files, ["tmp4"])
814 break
815 else:
816 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000817
818 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000819 # Tear everything down. This is a decent use for bottom-up on
820 # Windows, which doesn't have a recursive delete command. The
821 # (not so) subtlety is that rmdir will fail unless the dir's
822 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000823 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000824 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000825 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000826 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000827 dirname = os.path.join(root, name)
828 if not os.path.islink(dirname):
829 os.rmdir(dirname)
830 else:
831 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000832 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000833
Charles-François Natali7372b062012-02-05 15:15:38 +0100834
835@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
836class FwalkTests(WalkTests):
837 """Tests for os.fwalk()."""
838
Victor Stinner0561c532015-03-12 10:28:24 +0100839 def walk(self, directory, topdown=True, follow_symlinks=False):
840 walk_it = os.fwalk(directory,
841 topdown=topdown,
842 follow_symlinks=follow_symlinks)
843 for root, dirs, files, root_fd in walk_it:
844 yield (root, dirs, files)
845
846
Larry Hastingsc48fe982012-06-25 04:49:05 -0700847 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
848 """
849 compare with walk() results.
850 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700851 walk_kwargs = walk_kwargs.copy()
852 fwalk_kwargs = fwalk_kwargs.copy()
853 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
854 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
855 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700856
Charles-François Natali7372b062012-02-05 15:15:38 +0100857 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700858 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100859 expected[root] = (set(dirs), set(files))
860
Larry Hastingsc48fe982012-06-25 04:49:05 -0700861 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100862 self.assertIn(root, expected)
863 self.assertEqual(expected[root], (set(dirs), set(files)))
864
Larry Hastingsc48fe982012-06-25 04:49:05 -0700865 def test_compare_to_walk(self):
866 kwargs = {'top': support.TESTFN}
867 self._compare_to_walk(kwargs, kwargs)
868
Charles-François Natali7372b062012-02-05 15:15:38 +0100869 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700870 try:
871 fd = os.open(".", os.O_RDONLY)
872 walk_kwargs = {'top': support.TESTFN}
873 fwalk_kwargs = walk_kwargs.copy()
874 fwalk_kwargs['dir_fd'] = fd
875 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
876 finally:
877 os.close(fd)
878
879 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100880 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700881 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
882 args = support.TESTFN, topdown, None
883 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100884 # check that the FD is valid
885 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700886 # redundant check
887 os.stat(rootfd)
888 # check that listdir() returns consistent information
889 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100890
891 def test_fd_leak(self):
892 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
893 # we both check that calling fwalk() a large number of times doesn't
894 # yield EMFILE, and that the minimum allocated FD hasn't changed.
895 minfd = os.dup(1)
896 os.close(minfd)
897 for i in range(256):
898 for x in os.fwalk(support.TESTFN):
899 pass
900 newfd = os.dup(1)
901 self.addCleanup(os.close, newfd)
902 self.assertEqual(newfd, minfd)
903
904 def tearDown(self):
905 # cleanup
906 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
907 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700908 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100909 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700910 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700911 if stat.S_ISDIR(st.st_mode):
912 os.rmdir(name, dir_fd=rootfd)
913 else:
914 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100915 os.rmdir(support.TESTFN)
916
917
Guido van Rossume7ba4952007-06-06 23:52:48 +0000918class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000919 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000920 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000921
922 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000923 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000924 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
925 os.makedirs(path) # Should work
926 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
927 os.makedirs(path)
928
929 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000930 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000931 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
932 os.makedirs(path)
933 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
934 'dir5', 'dir6')
935 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000936
Terry Reedy5a22b652010-12-02 07:05:56 +0000937 def test_exist_ok_existing_directory(self):
938 path = os.path.join(support.TESTFN, 'dir1')
939 mode = 0o777
940 old_mask = os.umask(0o022)
941 os.makedirs(path, mode)
942 self.assertRaises(OSError, os.makedirs, path, mode)
943 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -0400944 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +0000945 os.makedirs(path, mode=mode, exist_ok=True)
946 os.umask(old_mask)
947
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400948 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700949 def test_chown_uid_gid_arguments_must_be_index(self):
950 stat = os.stat(support.TESTFN)
951 uid = stat.st_uid
952 gid = stat.st_gid
953 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
954 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
955 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
956 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
957 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
958
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700959 def test_exist_ok_s_isgid_directory(self):
960 path = os.path.join(support.TESTFN, 'dir1')
961 S_ISGID = stat.S_ISGID
962 mode = 0o777
963 old_mask = os.umask(0o022)
964 try:
965 existing_testfn_mode = stat.S_IMODE(
966 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700967 try:
968 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700969 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700970 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700971 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
972 raise unittest.SkipTest('No support for S_ISGID dir mode.')
973 # The os should apply S_ISGID from the parent dir for us, but
974 # this test need not depend on that behavior. Be explicit.
975 os.makedirs(path, mode | S_ISGID)
976 # http://bugs.python.org/issue14992
977 # Should not fail when the bit is already set.
978 os.makedirs(path, mode, exist_ok=True)
979 # remove the bit.
980 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -0400981 # May work even when the bit is not already set when demanded.
982 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700983 finally:
984 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000985
986 def test_exist_ok_existing_regular_file(self):
987 base = support.TESTFN
988 path = os.path.join(support.TESTFN, 'dir1')
989 f = open(path, 'w')
990 f.write('abc')
991 f.close()
992 self.assertRaises(OSError, os.makedirs, path)
993 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
994 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
995 os.remove(path)
996
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000997 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000998 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000999 'dir4', 'dir5', 'dir6')
1000 # If the tests failed, the bottom-most directory ('../dir6')
1001 # may not have been created, so we look for the outermost directory
1002 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001003 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001004 path = os.path.dirname(path)
1005
1006 os.removedirs(path)
1007
Andrew Svetlov405faed2012-12-25 12:18:09 +02001008
1009class RemoveDirsTests(unittest.TestCase):
1010 def setUp(self):
1011 os.makedirs(support.TESTFN)
1012
1013 def tearDown(self):
1014 support.rmtree(support.TESTFN)
1015
1016 def test_remove_all(self):
1017 dira = os.path.join(support.TESTFN, 'dira')
1018 os.mkdir(dira)
1019 dirb = os.path.join(dira, 'dirb')
1020 os.mkdir(dirb)
1021 os.removedirs(dirb)
1022 self.assertFalse(os.path.exists(dirb))
1023 self.assertFalse(os.path.exists(dira))
1024 self.assertFalse(os.path.exists(support.TESTFN))
1025
1026 def test_remove_partial(self):
1027 dira = os.path.join(support.TESTFN, 'dira')
1028 os.mkdir(dira)
1029 dirb = os.path.join(dira, 'dirb')
1030 os.mkdir(dirb)
1031 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1032 f.write('text')
1033 os.removedirs(dirb)
1034 self.assertFalse(os.path.exists(dirb))
1035 self.assertTrue(os.path.exists(dira))
1036 self.assertTrue(os.path.exists(support.TESTFN))
1037
1038 def test_remove_nothing(self):
1039 dira = os.path.join(support.TESTFN, 'dira')
1040 os.mkdir(dira)
1041 dirb = os.path.join(dira, 'dirb')
1042 os.mkdir(dirb)
1043 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1044 f.write('text')
1045 with self.assertRaises(OSError):
1046 os.removedirs(dirb)
1047 self.assertTrue(os.path.exists(dirb))
1048 self.assertTrue(os.path.exists(dira))
1049 self.assertTrue(os.path.exists(support.TESTFN))
1050
1051
Guido van Rossume7ba4952007-06-06 23:52:48 +00001052class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001053 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001054 with open(os.devnull, 'wb') as f:
1055 f.write(b'hello')
1056 f.close()
1057 with open(os.devnull, 'rb') as f:
1058 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001059
Andrew Svetlov405faed2012-12-25 12:18:09 +02001060
Guido van Rossume7ba4952007-06-06 23:52:48 +00001061class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001062 def test_urandom_length(self):
1063 self.assertEqual(len(os.urandom(0)), 0)
1064 self.assertEqual(len(os.urandom(1)), 1)
1065 self.assertEqual(len(os.urandom(10)), 10)
1066 self.assertEqual(len(os.urandom(100)), 100)
1067 self.assertEqual(len(os.urandom(1000)), 1000)
1068
1069 def test_urandom_value(self):
1070 data1 = os.urandom(16)
1071 data2 = os.urandom(16)
1072 self.assertNotEqual(data1, data2)
1073
1074 def get_urandom_subprocess(self, count):
1075 code = '\n'.join((
1076 'import os, sys',
1077 'data = os.urandom(%s)' % count,
1078 'sys.stdout.buffer.write(data)',
1079 'sys.stdout.buffer.flush()'))
1080 out = assert_python_ok('-c', code)
1081 stdout = out[1]
1082 self.assertEqual(len(stdout), 16)
1083 return stdout
1084
1085 def test_urandom_subprocess(self):
1086 data1 = self.get_urandom_subprocess(16)
1087 data2 = self.get_urandom_subprocess(16)
1088 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001089
Victor Stinnerfe02e392014-12-21 01:16:38 +01001090
1091HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
1092
1093@unittest.skipIf(HAVE_GETENTROPY,
1094 "getentropy() does not use a file descriptor")
1095class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001096 @unittest.skipUnless(resource, "test requires the resource module")
1097 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001098 # Check urandom() failing when it is not able to open /dev/random.
1099 # We spawn a new process to make the test more robust (if getrlimit()
1100 # failed to restore the file descriptor limit after this, the whole
1101 # test suite would crash; this actually happened on the OS X Tiger
1102 # buildbot).
1103 code = """if 1:
1104 import errno
1105 import os
1106 import resource
1107
1108 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1109 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1110 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001111 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001112 except OSError as e:
1113 assert e.errno == errno.EMFILE, e.errno
1114 else:
1115 raise AssertionError("OSError not raised")
1116 """
1117 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001118
Antoine Pitroue472aea2014-04-26 14:33:03 +02001119 def test_urandom_fd_closed(self):
1120 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1121 # closed.
1122 code = """if 1:
1123 import os
1124 import sys
1125 os.urandom(4)
1126 os.closerange(3, 256)
1127 sys.stdout.buffer.write(os.urandom(4))
1128 """
1129 rc, out, err = assert_python_ok('-Sc', code)
1130
1131 def test_urandom_fd_reopened(self):
1132 # Issue #21207: urandom() should detect its fd to /dev/urandom
1133 # changed to something else, and reopen it.
1134 with open(support.TESTFN, 'wb') as f:
1135 f.write(b"x" * 256)
1136 self.addCleanup(os.unlink, support.TESTFN)
1137 code = """if 1:
1138 import os
1139 import sys
1140 os.urandom(4)
1141 for fd in range(3, 256):
1142 try:
1143 os.close(fd)
1144 except OSError:
1145 pass
1146 else:
1147 # Found the urandom fd (XXX hopefully)
1148 break
1149 os.closerange(3, 256)
1150 with open({TESTFN!r}, 'rb') as f:
1151 os.dup2(f.fileno(), fd)
1152 sys.stdout.buffer.write(os.urandom(4))
1153 sys.stdout.buffer.write(os.urandom(4))
1154 """.format(TESTFN=support.TESTFN)
1155 rc, out, err = assert_python_ok('-Sc', code)
1156 self.assertEqual(len(out), 8)
1157 self.assertNotEqual(out[0:4], out[4:8])
1158 rc, out2, err2 = assert_python_ok('-Sc', code)
1159 self.assertEqual(len(out2), 8)
1160 self.assertNotEqual(out2, out)
1161
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001162
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001163@contextlib.contextmanager
1164def _execvpe_mockup(defpath=None):
1165 """
1166 Stubs out execv and execve functions when used as context manager.
1167 Records exec calls. The mock execv and execve functions always raise an
1168 exception as they would normally never return.
1169 """
1170 # A list of tuples containing (function name, first arg, args)
1171 # of calls to execv or execve that have been made.
1172 calls = []
1173
1174 def mock_execv(name, *args):
1175 calls.append(('execv', name, args))
1176 raise RuntimeError("execv called")
1177
1178 def mock_execve(name, *args):
1179 calls.append(('execve', name, args))
1180 raise OSError(errno.ENOTDIR, "execve called")
1181
1182 try:
1183 orig_execv = os.execv
1184 orig_execve = os.execve
1185 orig_defpath = os.defpath
1186 os.execv = mock_execv
1187 os.execve = mock_execve
1188 if defpath is not None:
1189 os.defpath = defpath
1190 yield calls
1191 finally:
1192 os.execv = orig_execv
1193 os.execve = orig_execve
1194 os.defpath = orig_defpath
1195
Guido van Rossume7ba4952007-06-06 23:52:48 +00001196class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001197 @unittest.skipIf(USING_LINUXTHREADS,
1198 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001199 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001200 self.assertRaises(OSError, os.execvpe, 'no such app-',
1201 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001202
Thomas Heller6790d602007-08-30 17:15:14 +00001203 def test_execvpe_with_bad_arglist(self):
1204 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1205
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001206 @unittest.skipUnless(hasattr(os, '_execvpe'),
1207 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001208 def _test_internal_execvpe(self, test_type):
1209 program_path = os.sep + 'absolutepath'
1210 if test_type is bytes:
1211 program = b'executable'
1212 fullpath = os.path.join(os.fsencode(program_path), program)
1213 native_fullpath = fullpath
1214 arguments = [b'progname', 'arg1', 'arg2']
1215 else:
1216 program = 'executable'
1217 arguments = ['progname', 'arg1', 'arg2']
1218 fullpath = os.path.join(program_path, program)
1219 if os.name != "nt":
1220 native_fullpath = os.fsencode(fullpath)
1221 else:
1222 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001223 env = {'spam': 'beans'}
1224
Victor Stinnerb745a742010-05-18 17:17:23 +00001225 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001226 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001227 self.assertRaises(RuntimeError,
1228 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001229 self.assertEqual(len(calls), 1)
1230 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1231
Victor Stinnerb745a742010-05-18 17:17:23 +00001232 # test os._execvpe() with a relative path:
1233 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001234 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001235 self.assertRaises(OSError,
1236 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001237 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001238 self.assertSequenceEqual(calls[0],
1239 ('execve', native_fullpath, (arguments, env)))
1240
1241 # test os._execvpe() with a relative path:
1242 # os.get_exec_path() reads the 'PATH' variable
1243 with _execvpe_mockup() as calls:
1244 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001245 if test_type is bytes:
1246 env_path[b'PATH'] = program_path
1247 else:
1248 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001249 self.assertRaises(OSError,
1250 os._execvpe, program, arguments, env=env_path)
1251 self.assertEqual(len(calls), 1)
1252 self.assertSequenceEqual(calls[0],
1253 ('execve', native_fullpath, (arguments, env_path)))
1254
1255 def test_internal_execvpe_str(self):
1256 self._test_internal_execvpe(str)
1257 if os.name != "nt":
1258 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001259
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001260
Serhiy Storchaka43767632013-11-03 21:31:38 +02001261@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001262class Win32ErrorTests(unittest.TestCase):
1263 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001264 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001265
1266 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001267 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001268
1269 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001270 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001271
1272 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001273 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001274 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001275 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001276 finally:
1277 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001278 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001279
1280 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001281 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001282
Thomas Wouters477c8d52006-05-27 19:21:47 +00001283 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001284 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001285
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001286class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001287 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001288 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1289 #singles.append("close")
1290 #We omit close because it doesn'r raise an exception on some platforms
1291 def get_single(f):
1292 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001293 if hasattr(os, f):
1294 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001295 return helper
1296 for f in singles:
1297 locals()["test_"+f] = get_single(f)
1298
Benjamin Peterson7522c742009-01-19 21:00:09 +00001299 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001300 try:
1301 f(support.make_bad_fd(), *args)
1302 except OSError as e:
1303 self.assertEqual(e.errno, errno.EBADF)
1304 else:
1305 self.fail("%r didn't raise a OSError with a bad file descriptor"
1306 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001307
Serhiy Storchaka43767632013-11-03 21:31:38 +02001308 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001309 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001310 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001311
Serhiy Storchaka43767632013-11-03 21:31:38 +02001312 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001313 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001314 fd = support.make_bad_fd()
1315 # Make sure none of the descriptors we are about to close are
1316 # currently valid (issue 6542).
1317 for i in range(10):
1318 try: os.fstat(fd+i)
1319 except OSError:
1320 pass
1321 else:
1322 break
1323 if i < 2:
1324 raise unittest.SkipTest(
1325 "Unable to acquire a range of invalid file descriptors")
1326 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001327
Serhiy Storchaka43767632013-11-03 21:31:38 +02001328 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001329 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001330 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001331
Serhiy Storchaka43767632013-11-03 21:31:38 +02001332 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001333 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001334 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001335
Serhiy Storchaka43767632013-11-03 21:31:38 +02001336 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001337 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001338 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001339
Serhiy Storchaka43767632013-11-03 21:31:38 +02001340 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001341 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001342 self.check(os.pathconf, "PC_NAME_MAX")
1343 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001344
Serhiy Storchaka43767632013-11-03 21:31:38 +02001345 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001346 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001347 self.check(os.truncate, 0)
1348 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001349
Serhiy Storchaka43767632013-11-03 21:31:38 +02001350 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001351 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001352 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001353
Serhiy Storchaka43767632013-11-03 21:31:38 +02001354 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001355 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001356 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001357
Victor Stinner57ddf782014-01-08 15:21:28 +01001358 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1359 def test_readv(self):
1360 buf = bytearray(10)
1361 self.check(os.readv, [buf])
1362
Serhiy Storchaka43767632013-11-03 21:31:38 +02001363 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001364 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001365 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001366
Serhiy Storchaka43767632013-11-03 21:31:38 +02001367 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001368 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001369 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001370
Victor Stinner57ddf782014-01-08 15:21:28 +01001371 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1372 def test_writev(self):
1373 self.check(os.writev, [b'abc'])
1374
Brian Curtin1b9df392010-11-24 20:24:31 +00001375
1376class LinkTests(unittest.TestCase):
1377 def setUp(self):
1378 self.file1 = support.TESTFN
1379 self.file2 = os.path.join(support.TESTFN + "2")
1380
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001381 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001382 for file in (self.file1, self.file2):
1383 if os.path.exists(file):
1384 os.unlink(file)
1385
Brian Curtin1b9df392010-11-24 20:24:31 +00001386 def _test_link(self, file1, file2):
1387 with open(file1, "w") as f1:
1388 f1.write("test")
1389
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001390 with warnings.catch_warnings():
1391 warnings.simplefilter("ignore", DeprecationWarning)
1392 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001393 with open(file1, "r") as f1, open(file2, "r") as f2:
1394 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1395
1396 def test_link(self):
1397 self._test_link(self.file1, self.file2)
1398
1399 def test_link_bytes(self):
1400 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1401 bytes(self.file2, sys.getfilesystemencoding()))
1402
Brian Curtinf498b752010-11-30 15:54:04 +00001403 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001404 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001405 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001406 except UnicodeError:
1407 raise unittest.SkipTest("Unable to encode for this platform.")
1408
Brian Curtinf498b752010-11-30 15:54:04 +00001409 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001410 self.file2 = self.file1 + "2"
1411 self._test_link(self.file1, self.file2)
1412
Serhiy Storchaka43767632013-11-03 21:31:38 +02001413@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1414class PosixUidGidTests(unittest.TestCase):
1415 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1416 def test_setuid(self):
1417 if os.getuid() != 0:
1418 self.assertRaises(OSError, os.setuid, 0)
1419 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001420
Serhiy Storchaka43767632013-11-03 21:31:38 +02001421 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1422 def test_setgid(self):
1423 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1424 self.assertRaises(OSError, os.setgid, 0)
1425 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001426
Serhiy Storchaka43767632013-11-03 21:31:38 +02001427 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1428 def test_seteuid(self):
1429 if os.getuid() != 0:
1430 self.assertRaises(OSError, os.seteuid, 0)
1431 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001432
Serhiy Storchaka43767632013-11-03 21:31:38 +02001433 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1434 def test_setegid(self):
1435 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1436 self.assertRaises(OSError, os.setegid, 0)
1437 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001438
Serhiy Storchaka43767632013-11-03 21:31:38 +02001439 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1440 def test_setreuid(self):
1441 if os.getuid() != 0:
1442 self.assertRaises(OSError, os.setreuid, 0, 0)
1443 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1444 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001445
Serhiy Storchaka43767632013-11-03 21:31:38 +02001446 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1447 def test_setreuid_neg1(self):
1448 # Needs to accept -1. We run this in a subprocess to avoid
1449 # altering the test runner's process state (issue8045).
1450 subprocess.check_call([
1451 sys.executable, '-c',
1452 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001453
Serhiy Storchaka43767632013-11-03 21:31:38 +02001454 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1455 def test_setregid(self):
1456 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1457 self.assertRaises(OSError, os.setregid, 0, 0)
1458 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1459 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001460
Serhiy Storchaka43767632013-11-03 21:31:38 +02001461 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1462 def test_setregid_neg1(self):
1463 # Needs to accept -1. We run this in a subprocess to avoid
1464 # altering the test runner's process state (issue8045).
1465 subprocess.check_call([
1466 sys.executable, '-c',
1467 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001468
Serhiy Storchaka43767632013-11-03 21:31:38 +02001469@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1470class Pep383Tests(unittest.TestCase):
1471 def setUp(self):
1472 if support.TESTFN_UNENCODABLE:
1473 self.dir = support.TESTFN_UNENCODABLE
1474 elif support.TESTFN_NONASCII:
1475 self.dir = support.TESTFN_NONASCII
1476 else:
1477 self.dir = support.TESTFN
1478 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001479
Serhiy Storchaka43767632013-11-03 21:31:38 +02001480 bytesfn = []
1481 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001482 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001483 fn = os.fsencode(fn)
1484 except UnicodeEncodeError:
1485 return
1486 bytesfn.append(fn)
1487 add_filename(support.TESTFN_UNICODE)
1488 if support.TESTFN_UNENCODABLE:
1489 add_filename(support.TESTFN_UNENCODABLE)
1490 if support.TESTFN_NONASCII:
1491 add_filename(support.TESTFN_NONASCII)
1492 if not bytesfn:
1493 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001494
Serhiy Storchaka43767632013-11-03 21:31:38 +02001495 self.unicodefn = set()
1496 os.mkdir(self.dir)
1497 try:
1498 for fn in bytesfn:
1499 support.create_empty_file(os.path.join(self.bdir, fn))
1500 fn = os.fsdecode(fn)
1501 if fn in self.unicodefn:
1502 raise ValueError("duplicate filename")
1503 self.unicodefn.add(fn)
1504 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001505 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001506 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001507
Serhiy Storchaka43767632013-11-03 21:31:38 +02001508 def tearDown(self):
1509 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001510
Serhiy Storchaka43767632013-11-03 21:31:38 +02001511 def test_listdir(self):
1512 expected = self.unicodefn
1513 found = set(os.listdir(self.dir))
1514 self.assertEqual(found, expected)
1515 # test listdir without arguments
1516 current_directory = os.getcwd()
1517 try:
1518 os.chdir(os.sep)
1519 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1520 finally:
1521 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001522
Serhiy Storchaka43767632013-11-03 21:31:38 +02001523 def test_open(self):
1524 for fn in self.unicodefn:
1525 f = open(os.path.join(self.dir, fn), 'rb')
1526 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001527
Serhiy Storchaka43767632013-11-03 21:31:38 +02001528 @unittest.skipUnless(hasattr(os, 'statvfs'),
1529 "need os.statvfs()")
1530 def test_statvfs(self):
1531 # issue #9645
1532 for fn in self.unicodefn:
1533 # should not fail with file not found error
1534 fullname = os.path.join(self.dir, fn)
1535 os.statvfs(fullname)
1536
1537 def test_stat(self):
1538 for fn in self.unicodefn:
1539 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001540
Brian Curtineb24d742010-04-12 17:16:38 +00001541@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1542class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001543 def _kill(self, sig):
1544 # Start sys.executable as a subprocess and communicate from the
1545 # subprocess to the parent that the interpreter is ready. When it
1546 # becomes ready, send *sig* via os.kill to the subprocess and check
1547 # that the return code is equal to *sig*.
1548 import ctypes
1549 from ctypes import wintypes
1550 import msvcrt
1551
1552 # Since we can't access the contents of the process' stdout until the
1553 # process has exited, use PeekNamedPipe to see what's inside stdout
1554 # without waiting. This is done so we can tell that the interpreter
1555 # is started and running at a point where it could handle a signal.
1556 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1557 PeekNamedPipe.restype = wintypes.BOOL
1558 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1559 ctypes.POINTER(ctypes.c_char), # stdout buf
1560 wintypes.DWORD, # Buffer size
1561 ctypes.POINTER(wintypes.DWORD), # bytes read
1562 ctypes.POINTER(wintypes.DWORD), # bytes avail
1563 ctypes.POINTER(wintypes.DWORD)) # bytes left
1564 msg = "running"
1565 proc = subprocess.Popen([sys.executable, "-c",
1566 "import sys;"
1567 "sys.stdout.write('{}');"
1568 "sys.stdout.flush();"
1569 "input()".format(msg)],
1570 stdout=subprocess.PIPE,
1571 stderr=subprocess.PIPE,
1572 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001573 self.addCleanup(proc.stdout.close)
1574 self.addCleanup(proc.stderr.close)
1575 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001576
1577 count, max = 0, 100
1578 while count < max and proc.poll() is None:
1579 # Create a string buffer to store the result of stdout from the pipe
1580 buf = ctypes.create_string_buffer(len(msg))
1581 # Obtain the text currently in proc.stdout
1582 # Bytes read/avail/left are left as NULL and unused
1583 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1584 buf, ctypes.sizeof(buf), None, None, None)
1585 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1586 if buf.value:
1587 self.assertEqual(msg, buf.value.decode())
1588 break
1589 time.sleep(0.1)
1590 count += 1
1591 else:
1592 self.fail("Did not receive communication from the subprocess")
1593
Brian Curtineb24d742010-04-12 17:16:38 +00001594 os.kill(proc.pid, sig)
1595 self.assertEqual(proc.wait(), sig)
1596
1597 def test_kill_sigterm(self):
1598 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001599 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001600
1601 def test_kill_int(self):
1602 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001603 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001604
1605 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001606 tagname = "test_os_%s" % uuid.uuid1()
1607 m = mmap.mmap(-1, 1, tagname)
1608 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001609 # Run a script which has console control handling enabled.
1610 proc = subprocess.Popen([sys.executable,
1611 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001612 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001613 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1614 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001615 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001616 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001617 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001618 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001619 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001620 count += 1
1621 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001622 # Forcefully kill the process if we weren't able to signal it.
1623 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001624 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001625 os.kill(proc.pid, event)
1626 # proc.send_signal(event) could also be done here.
1627 # Allow time for the signal to be passed and the process to exit.
1628 time.sleep(0.5)
1629 if not proc.poll():
1630 # Forcefully kill the process if we weren't able to signal it.
1631 os.kill(proc.pid, signal.SIGINT)
1632 self.fail("subprocess did not stop on {}".format(name))
1633
1634 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1635 def test_CTRL_C_EVENT(self):
1636 from ctypes import wintypes
1637 import ctypes
1638
1639 # Make a NULL value by creating a pointer with no argument.
1640 NULL = ctypes.POINTER(ctypes.c_int)()
1641 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1642 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1643 wintypes.BOOL)
1644 SetConsoleCtrlHandler.restype = wintypes.BOOL
1645
1646 # Calling this with NULL and FALSE causes the calling process to
1647 # handle CTRL+C, rather than ignore it. This property is inherited
1648 # by subprocesses.
1649 SetConsoleCtrlHandler(NULL, 0)
1650
1651 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1652
1653 def test_CTRL_BREAK_EVENT(self):
1654 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1655
1656
Brian Curtind40e6f72010-07-08 21:39:08 +00001657@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001658class Win32ListdirTests(unittest.TestCase):
1659 """Test listdir on Windows."""
1660
1661 def setUp(self):
1662 self.created_paths = []
1663 for i in range(2):
1664 dir_name = 'SUB%d' % i
1665 dir_path = os.path.join(support.TESTFN, dir_name)
1666 file_name = 'FILE%d' % i
1667 file_path = os.path.join(support.TESTFN, file_name)
1668 os.makedirs(dir_path)
1669 with open(file_path, 'w') as f:
1670 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1671 self.created_paths.extend([dir_name, file_name])
1672 self.created_paths.sort()
1673
1674 def tearDown(self):
1675 shutil.rmtree(support.TESTFN)
1676
1677 def test_listdir_no_extended_path(self):
1678 """Test when the path is not an "extended" path."""
1679 # unicode
1680 self.assertEqual(
1681 sorted(os.listdir(support.TESTFN)),
1682 self.created_paths)
1683 # bytes
1684 self.assertEqual(
1685 sorted(os.listdir(os.fsencode(support.TESTFN))),
1686 [os.fsencode(path) for path in self.created_paths])
1687
1688 def test_listdir_extended_path(self):
1689 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001690 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001691 # unicode
1692 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1693 self.assertEqual(
1694 sorted(os.listdir(path)),
1695 self.created_paths)
1696 # bytes
1697 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1698 self.assertEqual(
1699 sorted(os.listdir(path)),
1700 [os.fsencode(path) for path in self.created_paths])
1701
1702
1703@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001704@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001705class Win32SymlinkTests(unittest.TestCase):
1706 filelink = 'filelinktest'
1707 filelink_target = os.path.abspath(__file__)
1708 dirlink = 'dirlinktest'
1709 dirlink_target = os.path.dirname(filelink_target)
1710 missing_link = 'missing link'
1711
1712 def setUp(self):
1713 assert os.path.exists(self.dirlink_target)
1714 assert os.path.exists(self.filelink_target)
1715 assert not os.path.exists(self.dirlink)
1716 assert not os.path.exists(self.filelink)
1717 assert not os.path.exists(self.missing_link)
1718
1719 def tearDown(self):
1720 if os.path.exists(self.filelink):
1721 os.remove(self.filelink)
1722 if os.path.exists(self.dirlink):
1723 os.rmdir(self.dirlink)
1724 if os.path.lexists(self.missing_link):
1725 os.remove(self.missing_link)
1726
1727 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001728 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001729 self.assertTrue(os.path.exists(self.dirlink))
1730 self.assertTrue(os.path.isdir(self.dirlink))
1731 self.assertTrue(os.path.islink(self.dirlink))
1732 self.check_stat(self.dirlink, self.dirlink_target)
1733
1734 def test_file_link(self):
1735 os.symlink(self.filelink_target, self.filelink)
1736 self.assertTrue(os.path.exists(self.filelink))
1737 self.assertTrue(os.path.isfile(self.filelink))
1738 self.assertTrue(os.path.islink(self.filelink))
1739 self.check_stat(self.filelink, self.filelink_target)
1740
1741 def _create_missing_dir_link(self):
1742 'Create a "directory" link to a non-existent target'
1743 linkname = self.missing_link
1744 if os.path.lexists(linkname):
1745 os.remove(linkname)
1746 target = r'c:\\target does not exist.29r3c740'
1747 assert not os.path.exists(target)
1748 target_is_dir = True
1749 os.symlink(target, linkname, target_is_dir)
1750
1751 def test_remove_directory_link_to_missing_target(self):
1752 self._create_missing_dir_link()
1753 # For compatibility with Unix, os.remove will check the
1754 # directory status and call RemoveDirectory if the symlink
1755 # was created with target_is_dir==True.
1756 os.remove(self.missing_link)
1757
1758 @unittest.skip("currently fails; consider for improvement")
1759 def test_isdir_on_directory_link_to_missing_target(self):
1760 self._create_missing_dir_link()
1761 # consider having isdir return true for directory links
1762 self.assertTrue(os.path.isdir(self.missing_link))
1763
1764 @unittest.skip("currently fails; consider for improvement")
1765 def test_rmdir_on_directory_link_to_missing_target(self):
1766 self._create_missing_dir_link()
1767 # consider allowing rmdir to remove directory links
1768 os.rmdir(self.missing_link)
1769
1770 def check_stat(self, link, target):
1771 self.assertEqual(os.stat(link), os.stat(target))
1772 self.assertNotEqual(os.lstat(link), os.stat(link))
1773
Brian Curtind25aef52011-06-13 15:16:04 -05001774 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001775 with warnings.catch_warnings():
1776 warnings.simplefilter("ignore", DeprecationWarning)
1777 self.assertEqual(os.stat(bytes_link), os.stat(target))
1778 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001779
1780 def test_12084(self):
1781 level1 = os.path.abspath(support.TESTFN)
1782 level2 = os.path.join(level1, "level2")
1783 level3 = os.path.join(level2, "level3")
1784 try:
1785 os.mkdir(level1)
1786 os.mkdir(level2)
1787 os.mkdir(level3)
1788
1789 file1 = os.path.abspath(os.path.join(level1, "file1"))
1790
1791 with open(file1, "w") as f:
1792 f.write("file1")
1793
1794 orig_dir = os.getcwd()
1795 try:
1796 os.chdir(level2)
1797 link = os.path.join(level2, "link")
1798 os.symlink(os.path.relpath(file1), "link")
1799 self.assertIn("link", os.listdir(os.getcwd()))
1800
1801 # Check os.stat calls from the same dir as the link
1802 self.assertEqual(os.stat(file1), os.stat("link"))
1803
1804 # Check os.stat calls from a dir below the link
1805 os.chdir(level1)
1806 self.assertEqual(os.stat(file1),
1807 os.stat(os.path.relpath(link)))
1808
1809 # Check os.stat calls from a dir above the link
1810 os.chdir(level3)
1811 self.assertEqual(os.stat(file1),
1812 os.stat(os.path.relpath(link)))
1813 finally:
1814 os.chdir(orig_dir)
1815 except OSError as err:
1816 self.fail(err)
1817 finally:
1818 os.remove(file1)
1819 shutil.rmtree(level1)
1820
Brian Curtind40e6f72010-07-08 21:39:08 +00001821
Jason R. Coombs3a092862013-05-27 23:21:28 -04001822@support.skip_unless_symlink
1823class NonLocalSymlinkTests(unittest.TestCase):
1824
1825 def setUp(self):
1826 """
1827 Create this structure:
1828
1829 base
1830 \___ some_dir
1831 """
1832 os.makedirs('base/some_dir')
1833
1834 def tearDown(self):
1835 shutil.rmtree('base')
1836
1837 def test_directory_link_nonlocal(self):
1838 """
1839 The symlink target should resolve relative to the link, not relative
1840 to the current directory.
1841
1842 Then, link base/some_link -> base/some_dir and ensure that some_link
1843 is resolved as a directory.
1844
1845 In issue13772, it was discovered that directory detection failed if
1846 the symlink target was not specified relative to the current
1847 directory, which was a defect in the implementation.
1848 """
1849 src = os.path.join('base', 'some_link')
1850 os.symlink('some_dir', src)
1851 assert os.path.isdir(src)
1852
1853
Victor Stinnere8d51452010-08-19 01:05:19 +00001854class FSEncodingTests(unittest.TestCase):
1855 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001856 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1857 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001858
Victor Stinnere8d51452010-08-19 01:05:19 +00001859 def test_identity(self):
1860 # assert fsdecode(fsencode(x)) == x
1861 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1862 try:
1863 bytesfn = os.fsencode(fn)
1864 except UnicodeEncodeError:
1865 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001866 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001867
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001868
Brett Cannonefb00c02012-02-29 18:31:31 -05001869
1870class DeviceEncodingTests(unittest.TestCase):
1871
1872 def test_bad_fd(self):
1873 # Return None when an fd doesn't actually exist.
1874 self.assertIsNone(os.device_encoding(123456))
1875
Philip Jenveye308b7c2012-02-29 16:16:15 -08001876 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1877 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001878 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001879 def test_device_encoding(self):
1880 encoding = os.device_encoding(0)
1881 self.assertIsNotNone(encoding)
1882 self.assertTrue(codecs.lookup(encoding))
1883
1884
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001885class PidTests(unittest.TestCase):
1886 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1887 def test_getppid(self):
1888 p = subprocess.Popen([sys.executable, '-c',
1889 'import os; print(os.getppid())'],
1890 stdout=subprocess.PIPE)
1891 stdout, _ = p.communicate()
1892 # We are the parent of our subprocess
1893 self.assertEqual(int(stdout), os.getpid())
1894
1895
Brian Curtin0151b8e2010-09-24 13:43:43 +00001896# The introduction of this TestCase caused at least two different errors on
1897# *nix buildbots. Temporarily skip this to let the buildbots move along.
1898@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001899@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1900class LoginTests(unittest.TestCase):
1901 def test_getlogin(self):
1902 user_name = os.getlogin()
1903 self.assertNotEqual(len(user_name), 0)
1904
1905
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001906@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1907 "needs os.getpriority and os.setpriority")
1908class ProgramPriorityTests(unittest.TestCase):
1909 """Tests for os.getpriority() and os.setpriority()."""
1910
1911 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001912
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001913 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1914 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1915 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001916 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1917 if base >= 19 and new_prio <= 19:
1918 raise unittest.SkipTest(
1919 "unable to reliably test setpriority at current nice level of %s" % base)
1920 else:
1921 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001922 finally:
1923 try:
1924 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1925 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001926 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001927 raise
1928
1929
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001930if threading is not None:
1931 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001932
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001933 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001934
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001935 def __init__(self, conn):
1936 asynchat.async_chat.__init__(self, conn)
1937 self.in_buffer = []
1938 self.closed = False
1939 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001940
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001941 def handle_read(self):
1942 data = self.recv(4096)
1943 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001944
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001945 def get_data(self):
1946 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001947
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001948 def handle_close(self):
1949 self.close()
1950 self.closed = True
1951
1952 def handle_error(self):
1953 raise
1954
1955 def __init__(self, address):
1956 threading.Thread.__init__(self)
1957 asyncore.dispatcher.__init__(self)
1958 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1959 self.bind(address)
1960 self.listen(5)
1961 self.host, self.port = self.socket.getsockname()[:2]
1962 self.handler_instance = None
1963 self._active = False
1964 self._active_lock = threading.Lock()
1965
1966 # --- public API
1967
1968 @property
1969 def running(self):
1970 return self._active
1971
1972 def start(self):
1973 assert not self.running
1974 self.__flag = threading.Event()
1975 threading.Thread.start(self)
1976 self.__flag.wait()
1977
1978 def stop(self):
1979 assert self.running
1980 self._active = False
1981 self.join()
1982
1983 def wait(self):
1984 # wait for handler connection to be closed, then stop the server
1985 while not getattr(self.handler_instance, "closed", False):
1986 time.sleep(0.001)
1987 self.stop()
1988
1989 # --- internals
1990
1991 def run(self):
1992 self._active = True
1993 self.__flag.set()
1994 while self._active and asyncore.socket_map:
1995 self._active_lock.acquire()
1996 asyncore.loop(timeout=0.001, count=1)
1997 self._active_lock.release()
1998 asyncore.close_all()
1999
2000 def handle_accept(self):
2001 conn, addr = self.accept()
2002 self.handler_instance = self.Handler(conn)
2003
2004 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002005 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002006 handle_read = handle_connect
2007
2008 def writable(self):
2009 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002010
2011 def handle_error(self):
2012 raise
2013
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002014
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002015@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002016@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2017class TestSendfile(unittest.TestCase):
2018
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002019 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002020 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002021 not sys.platform.startswith("solaris") and \
2022 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002023 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2024 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002025
2026 @classmethod
2027 def setUpClass(cls):
2028 with open(support.TESTFN, "wb") as f:
2029 f.write(cls.DATA)
2030
2031 @classmethod
2032 def tearDownClass(cls):
2033 support.unlink(support.TESTFN)
2034
2035 def setUp(self):
2036 self.server = SendfileTestServer((support.HOST, 0))
2037 self.server.start()
2038 self.client = socket.socket()
2039 self.client.connect((self.server.host, self.server.port))
2040 self.client.settimeout(1)
2041 # synchronize by waiting for "220 ready" response
2042 self.client.recv(1024)
2043 self.sockno = self.client.fileno()
2044 self.file = open(support.TESTFN, 'rb')
2045 self.fileno = self.file.fileno()
2046
2047 def tearDown(self):
2048 self.file.close()
2049 self.client.close()
2050 if self.server.running:
2051 self.server.stop()
2052
2053 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2054 """A higher level wrapper representing how an application is
2055 supposed to use sendfile().
2056 """
2057 while 1:
2058 try:
2059 if self.SUPPORT_HEADERS_TRAILERS:
2060 return os.sendfile(sock, file, offset, nbytes, headers,
2061 trailers)
2062 else:
2063 return os.sendfile(sock, file, offset, nbytes)
2064 except OSError as err:
2065 if err.errno == errno.ECONNRESET:
2066 # disconnected
2067 raise
2068 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2069 # we have to retry send data
2070 continue
2071 else:
2072 raise
2073
2074 def test_send_whole_file(self):
2075 # normal send
2076 total_sent = 0
2077 offset = 0
2078 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002079 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002080 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2081 if sent == 0:
2082 break
2083 offset += sent
2084 total_sent += sent
2085 self.assertTrue(sent <= nbytes)
2086 self.assertEqual(offset, total_sent)
2087
2088 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002089 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002090 self.client.close()
2091 self.server.wait()
2092 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002093 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002094 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002095
2096 def test_send_at_certain_offset(self):
2097 # start sending a file at a certain offset
2098 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002099 offset = len(self.DATA) // 2
2100 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002101 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002102 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002103 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2104 if sent == 0:
2105 break
2106 offset += sent
2107 total_sent += sent
2108 self.assertTrue(sent <= nbytes)
2109
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002110 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002111 self.client.close()
2112 self.server.wait()
2113 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002114 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002115 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002116 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002117 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002118
2119 def test_offset_overflow(self):
2120 # specify an offset > file size
2121 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002122 try:
2123 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2124 except OSError as e:
2125 # Solaris can raise EINVAL if offset >= file length, ignore.
2126 if e.errno != errno.EINVAL:
2127 raise
2128 else:
2129 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002130 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002131 self.client.close()
2132 self.server.wait()
2133 data = self.server.handler_instance.get_data()
2134 self.assertEqual(data, b'')
2135
2136 def test_invalid_offset(self):
2137 with self.assertRaises(OSError) as cm:
2138 os.sendfile(self.sockno, self.fileno, -1, 4096)
2139 self.assertEqual(cm.exception.errno, errno.EINVAL)
2140
2141 # --- headers / trailers tests
2142
Serhiy Storchaka43767632013-11-03 21:31:38 +02002143 @requires_headers_trailers
2144 def test_headers(self):
2145 total_sent = 0
2146 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2147 headers=[b"x" * 512])
2148 total_sent += sent
2149 offset = 4096
2150 nbytes = 4096
2151 while 1:
2152 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2153 offset, nbytes)
2154 if sent == 0:
2155 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002156 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002157 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002158
Serhiy Storchaka43767632013-11-03 21:31:38 +02002159 expected_data = b"x" * 512 + self.DATA
2160 self.assertEqual(total_sent, len(expected_data))
2161 self.client.close()
2162 self.server.wait()
2163 data = self.server.handler_instance.get_data()
2164 self.assertEqual(hash(data), hash(expected_data))
2165
2166 @requires_headers_trailers
2167 def test_trailers(self):
2168 TESTFN2 = support.TESTFN + "2"
2169 file_data = b"abcdef"
2170 with open(TESTFN2, 'wb') as f:
2171 f.write(file_data)
2172 with open(TESTFN2, 'rb')as f:
2173 self.addCleanup(os.remove, TESTFN2)
2174 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2175 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002176 self.client.close()
2177 self.server.wait()
2178 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002179 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002180
Serhiy Storchaka43767632013-11-03 21:31:38 +02002181 @requires_headers_trailers
2182 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2183 'test needs os.SF_NODISKIO')
2184 def test_flags(self):
2185 try:
2186 os.sendfile(self.sockno, self.fileno, 0, 4096,
2187 flags=os.SF_NODISKIO)
2188 except OSError as err:
2189 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2190 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002191
2192
Larry Hastings9cf065c2012-06-22 16:30:09 -07002193def supports_extended_attributes():
2194 if not hasattr(os, "setxattr"):
2195 return False
2196 try:
2197 with open(support.TESTFN, "wb") as fp:
2198 try:
2199 os.setxattr(fp.fileno(), b"user.test", b"")
2200 except OSError:
2201 return False
2202 finally:
2203 support.unlink(support.TESTFN)
2204 # Kernels < 2.6.39 don't respect setxattr flags.
2205 kernel_version = platform.release()
2206 m = re.match("2.6.(\d{1,2})", kernel_version)
2207 return m is None or int(m.group(1)) >= 39
2208
2209
2210@unittest.skipUnless(supports_extended_attributes(),
2211 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002212class ExtendedAttributeTests(unittest.TestCase):
2213
2214 def tearDown(self):
2215 support.unlink(support.TESTFN)
2216
Larry Hastings9cf065c2012-06-22 16:30:09 -07002217 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002218 fn = support.TESTFN
2219 open(fn, "wb").close()
2220 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002221 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002222 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002223 init_xattr = listxattr(fn)
2224 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002225 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002226 xattr = set(init_xattr)
2227 xattr.add("user.test")
2228 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002229 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2230 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2231 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002232 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002233 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002234 self.assertEqual(cm.exception.errno, errno.EEXIST)
2235 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002236 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002237 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002238 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002239 xattr.add("user.test2")
2240 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002241 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002242 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002243 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002244 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002245 xattr.remove("user.test")
2246 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002247 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2248 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2249 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2250 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002251 many = sorted("user.test{}".format(i) for i in range(100))
2252 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002253 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002254 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002255
Larry Hastings9cf065c2012-06-22 16:30:09 -07002256 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002257 def make_bytes(s):
2258 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002259 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002260 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002261 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002262
2263 def test_simple(self):
2264 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2265 os.listxattr)
2266
2267 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002268 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2269 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002270
2271 def test_fds(self):
2272 def getxattr(path, *args):
2273 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002274 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002275 def setxattr(path, *args):
2276 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002277 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002278 def removexattr(path, *args):
2279 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002280 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002281 def listxattr(path, *args):
2282 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002283 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002284 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2285
2286
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002287@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2288class Win32DeprecatedBytesAPI(unittest.TestCase):
2289 def test_deprecated(self):
2290 import nt
2291 filename = os.fsencode(support.TESTFN)
2292 with warnings.catch_warnings():
2293 warnings.simplefilter("error", DeprecationWarning)
2294 for func, *args in (
2295 (nt._getfullpathname, filename),
2296 (nt._isdir, filename),
2297 (os.access, filename, os.R_OK),
2298 (os.chdir, filename),
2299 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002300 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002301 (os.link, filename, filename),
2302 (os.listdir, filename),
2303 (os.lstat, filename),
2304 (os.mkdir, filename),
2305 (os.open, filename, os.O_RDONLY),
2306 (os.rename, filename, filename),
2307 (os.rmdir, filename),
2308 (os.startfile, filename),
2309 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002310 (os.unlink, filename),
2311 (os.utime, filename),
2312 ):
2313 self.assertRaises(DeprecationWarning, func, *args)
2314
Victor Stinner28216442011-11-16 00:34:44 +01002315 @support.skip_unless_symlink
2316 def test_symlink(self):
2317 filename = os.fsencode(support.TESTFN)
2318 with warnings.catch_warnings():
2319 warnings.simplefilter("error", DeprecationWarning)
2320 self.assertRaises(DeprecationWarning,
2321 os.symlink, filename, filename)
2322
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002323
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002324@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2325class TermsizeTests(unittest.TestCase):
2326 def test_does_not_crash(self):
2327 """Check if get_terminal_size() returns a meaningful value.
2328
2329 There's no easy portable way to actually check the size of the
2330 terminal, so let's check if it returns something sensible instead.
2331 """
2332 try:
2333 size = os.get_terminal_size()
2334 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002335 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002336 # Under win32 a generic OSError can be thrown if the
2337 # handle cannot be retrieved
2338 self.skipTest("failed to query terminal size")
2339 raise
2340
Antoine Pitroucfade362012-02-08 23:48:59 +01002341 self.assertGreaterEqual(size.columns, 0)
2342 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002343
2344 def test_stty_match(self):
2345 """Check if stty returns the same results
2346
2347 stty actually tests stdin, so get_terminal_size is invoked on
2348 stdin explicitly. If stty succeeded, then get_terminal_size()
2349 should work too.
2350 """
2351 try:
2352 size = subprocess.check_output(['stty', 'size']).decode().split()
2353 except (FileNotFoundError, subprocess.CalledProcessError):
2354 self.skipTest("stty invocation failed")
2355 expected = (int(size[1]), int(size[0])) # reversed order
2356
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002357 try:
2358 actual = os.get_terminal_size(sys.__stdin__.fileno())
2359 except OSError as e:
2360 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2361 # Under win32 a generic OSError can be thrown if the
2362 # handle cannot be retrieved
2363 self.skipTest("failed to query terminal size")
2364 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002365 self.assertEqual(expected, actual)
2366
2367
Victor Stinner292c8352012-10-30 02:17:38 +01002368class OSErrorTests(unittest.TestCase):
2369 def setUp(self):
2370 class Str(str):
2371 pass
2372
Victor Stinnerafe17062012-10-31 22:47:43 +01002373 self.bytes_filenames = []
2374 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002375 if support.TESTFN_UNENCODABLE is not None:
2376 decoded = support.TESTFN_UNENCODABLE
2377 else:
2378 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002379 self.unicode_filenames.append(decoded)
2380 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002381 if support.TESTFN_UNDECODABLE is not None:
2382 encoded = support.TESTFN_UNDECODABLE
2383 else:
2384 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002385 self.bytes_filenames.append(encoded)
2386 self.bytes_filenames.append(memoryview(encoded))
2387
2388 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002389
2390 def test_oserror_filename(self):
2391 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002392 (self.filenames, os.chdir,),
2393 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002394 (self.filenames, os.lstat,),
2395 (self.filenames, os.open, os.O_RDONLY),
2396 (self.filenames, os.rmdir,),
2397 (self.filenames, os.stat,),
2398 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002399 ]
2400 if sys.platform == "win32":
2401 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002402 (self.bytes_filenames, os.rename, b"dst"),
2403 (self.bytes_filenames, os.replace, b"dst"),
2404 (self.unicode_filenames, os.rename, "dst"),
2405 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002406 # Issue #16414: Don't test undecodable names with listdir()
2407 # because of a Windows bug.
2408 #
2409 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2410 # empty list (instead of failing), whereas os.listdir(b'\xff')
2411 # raises a FileNotFoundError. It looks like a Windows bug:
2412 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2413 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2414 # ERROR_PATH_NOT_FOUND (3).
2415 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002416 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002417 else:
2418 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002419 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002420 (self.filenames, os.rename, "dst"),
2421 (self.filenames, os.replace, "dst"),
2422 ))
2423 if hasattr(os, "chown"):
2424 funcs.append((self.filenames, os.chown, 0, 0))
2425 if hasattr(os, "lchown"):
2426 funcs.append((self.filenames, os.lchown, 0, 0))
2427 if hasattr(os, "truncate"):
2428 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002429 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002430 funcs.append((self.filenames, os.chflags, 0))
2431 if hasattr(os, "lchflags"):
2432 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002433 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002434 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002435 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002436 if sys.platform == "win32":
2437 funcs.append((self.bytes_filenames, os.link, b"dst"))
2438 funcs.append((self.unicode_filenames, os.link, "dst"))
2439 else:
2440 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002441 if hasattr(os, "listxattr"):
2442 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002443 (self.filenames, os.listxattr,),
2444 (self.filenames, os.getxattr, "user.test"),
2445 (self.filenames, os.setxattr, "user.test", b'user'),
2446 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002447 ))
2448 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002449 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002450 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002451 if sys.platform == "win32":
2452 funcs.append((self.unicode_filenames, os.readlink,))
2453 else:
2454 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002455
Victor Stinnerafe17062012-10-31 22:47:43 +01002456 for filenames, func, *func_args in funcs:
2457 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002458 try:
2459 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002460 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002461 self.assertIs(err.filename, name)
2462 else:
2463 self.fail("No exception thrown by {}".format(func))
2464
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002465class CPUCountTests(unittest.TestCase):
2466 def test_cpu_count(self):
2467 cpus = os.cpu_count()
2468 if cpus is not None:
2469 self.assertIsInstance(cpus, int)
2470 self.assertGreater(cpus, 0)
2471 else:
2472 self.skipTest("Could not determine the number of CPUs")
2473
Victor Stinnerdaf45552013-08-28 00:53:59 +02002474
2475class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002476 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002477 fd = os.open(__file__, os.O_RDONLY)
2478 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002479 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002480
Victor Stinnerdaf45552013-08-28 00:53:59 +02002481 os.set_inheritable(fd, True)
2482 self.assertEqual(os.get_inheritable(fd), True)
2483
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002484 @unittest.skipIf(fcntl is None, "need fcntl")
2485 def test_get_inheritable_cloexec(self):
2486 fd = os.open(__file__, os.O_RDONLY)
2487 self.addCleanup(os.close, fd)
2488 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002489
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002490 # clear FD_CLOEXEC flag
2491 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2492 flags &= ~fcntl.FD_CLOEXEC
2493 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002494
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002495 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002496
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002497 @unittest.skipIf(fcntl is None, "need fcntl")
2498 def test_set_inheritable_cloexec(self):
2499 fd = os.open(__file__, os.O_RDONLY)
2500 self.addCleanup(os.close, fd)
2501 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2502 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002503
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002504 os.set_inheritable(fd, True)
2505 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2506 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002507
Victor Stinnerdaf45552013-08-28 00:53:59 +02002508 def test_open(self):
2509 fd = os.open(__file__, os.O_RDONLY)
2510 self.addCleanup(os.close, fd)
2511 self.assertEqual(os.get_inheritable(fd), False)
2512
2513 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2514 def test_pipe(self):
2515 rfd, wfd = os.pipe()
2516 self.addCleanup(os.close, rfd)
2517 self.addCleanup(os.close, wfd)
2518 self.assertEqual(os.get_inheritable(rfd), False)
2519 self.assertEqual(os.get_inheritable(wfd), False)
2520
2521 def test_dup(self):
2522 fd1 = os.open(__file__, os.O_RDONLY)
2523 self.addCleanup(os.close, fd1)
2524
2525 fd2 = os.dup(fd1)
2526 self.addCleanup(os.close, fd2)
2527 self.assertEqual(os.get_inheritable(fd2), False)
2528
2529 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2530 def test_dup2(self):
2531 fd = os.open(__file__, os.O_RDONLY)
2532 self.addCleanup(os.close, fd)
2533
2534 # inheritable by default
2535 fd2 = os.open(__file__, os.O_RDONLY)
2536 try:
2537 os.dup2(fd, fd2)
2538 self.assertEqual(os.get_inheritable(fd2), True)
2539 finally:
2540 os.close(fd2)
2541
2542 # force non-inheritable
2543 fd3 = os.open(__file__, os.O_RDONLY)
2544 try:
2545 os.dup2(fd, fd3, inheritable=False)
2546 self.assertEqual(os.get_inheritable(fd3), False)
2547 finally:
2548 os.close(fd3)
2549
2550 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2551 def test_openpty(self):
2552 master_fd, slave_fd = os.openpty()
2553 self.addCleanup(os.close, master_fd)
2554 self.addCleanup(os.close, slave_fd)
2555 self.assertEqual(os.get_inheritable(master_fd), False)
2556 self.assertEqual(os.get_inheritable(slave_fd), False)
2557
2558
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002559@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002560def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002561 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002562 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002563 StatAttributeTests,
2564 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002565 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002566 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002567 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002568 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002569 URandomTests,
Ned Deilyab6b9f82015-03-17 04:30:08 -07002570 URandomFDTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002571 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002572 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002573 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002574 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002575 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002576 Win32KillTests,
Tim Golden781bbeb2013-10-25 20:24:06 +01002577 Win32ListdirTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002578 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002579 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002580 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002581 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002582 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002583 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002584 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002585 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002586 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002587 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002588 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002589 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002590 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002591 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002592 CPUCountTests,
Victor Stinnerdaf45552013-08-28 00:53:59 +02002593 FDInheritanceTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002594 )
Fred Drake2e2be372001-09-20 21:33:42 +00002595
2596if __name__ == "__main__":
2597 test_main()