blob: 7d5ee69c45e41efade09ac68870c683502c3c24f [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070027import decimal
28import fractions
Christian Heimes25827622013-10-12 01:27:08 +020029import pickle
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020046
Georg Brandl2daf6ae2012-02-20 19:54:16 +010047from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000048
Victor Stinner034d0aa2012-06-05 01:22:15 +020049with warnings.catch_warnings():
50 warnings.simplefilter("ignore", DeprecationWarning)
51 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010052st = os.stat(__file__)
53stat_supports_subsecond = (
54 # check if float and int timestamps are different
55 (st.st_atime != st[7])
56 or (st.st_mtime != st[8])
57 or (st.st_ctime != st[9]))
58
Mark Dickinson7cf03892010-04-16 13:45:35 +000059# Detect whether we're on a Linux system that uses the (now outdated
60# and unmaintained) linuxthreads threading library. There's an issue
61# when combining linuxthreads with a failed execv call: see
62# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020063if hasattr(sys, 'thread_info') and sys.thread_info.version:
64 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
65else:
66 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000067
Stefan Krahebee49a2013-01-17 15:31:00 +010068# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
69HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
70
Thomas Wouters0e3f5912006-08-11 14:57:12 +000071# Tests creating TESTFN
72class FileTests(unittest.TestCase):
73 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000074 if os.path.exists(support.TESTFN):
75 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000076 tearDown = setUp
77
78 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000079 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000080 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000081 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000082
Christian Heimesfdab48e2008-01-20 09:06:41 +000083 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000084 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
85 # We must allocate two consecutive file descriptors, otherwise
86 # it will mess up other file descriptors (perhaps even the three
87 # standard ones).
88 second = os.dup(first)
89 try:
90 retries = 0
91 while second != first + 1:
92 os.close(first)
93 retries += 1
94 if retries > 10:
95 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000096 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000097 first, second = second, os.dup(second)
98 finally:
99 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000100 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000101 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000102 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000103
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000104 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000105 def test_rename(self):
106 path = support.TESTFN
107 old = sys.getrefcount(path)
108 self.assertRaises(TypeError, os.rename, path, 0)
109 new = sys.getrefcount(path)
110 self.assertEqual(old, new)
111
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000112 def test_read(self):
113 with open(support.TESTFN, "w+b") as fobj:
114 fobj.write(b"spam")
115 fobj.flush()
116 fd = fobj.fileno()
117 os.lseek(fd, 0, 0)
118 s = os.read(fd, 4)
119 self.assertEqual(type(s), bytes)
120 self.assertEqual(s, b"spam")
121
122 def test_write(self):
123 # os.write() accepts bytes- and buffer-like objects but not strings
124 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
125 self.assertRaises(TypeError, os.write, fd, "beans")
126 os.write(fd, b"bacon\n")
127 os.write(fd, bytearray(b"eggs\n"))
128 os.write(fd, memoryview(b"spam\n"))
129 os.close(fd)
130 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000131 self.assertEqual(fobj.read().splitlines(),
132 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000133
Victor Stinnere0daff12011-03-20 23:36:35 +0100134 def write_windows_console(self, *args):
135 retcode = subprocess.call(args,
136 # use a new console to not flood the test output
137 creationflags=subprocess.CREATE_NEW_CONSOLE,
138 # use a shell to hide the console window (SW_HIDE)
139 shell=True)
140 self.assertEqual(retcode, 0)
141
142 @unittest.skipUnless(sys.platform == 'win32',
143 'test specific to the Windows console')
144 def test_write_windows_console(self):
145 # Issue #11395: the Windows console returns an error (12: not enough
146 # space error) on writing into stdout if stdout mode is binary and the
147 # length is greater than 66,000 bytes (or less, depending on heap
148 # usage).
149 code = "print('x' * 100000)"
150 self.write_windows_console(sys.executable, "-c", code)
151 self.write_windows_console(sys.executable, "-u", "-c", code)
152
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000153 def fdopen_helper(self, *args):
154 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200155 f = os.fdopen(fd, *args)
156 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000157
158 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200159 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
160 os.close(fd)
161
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000162 self.fdopen_helper()
163 self.fdopen_helper('r')
164 self.fdopen_helper('r', 100)
165
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100166 def test_replace(self):
167 TESTFN2 = support.TESTFN + ".2"
168 with open(support.TESTFN, 'w') as f:
169 f.write("1")
170 with open(TESTFN2, 'w') as f:
171 f.write("2")
172 self.addCleanup(os.unlink, TESTFN2)
173 os.replace(support.TESTFN, TESTFN2)
174 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
175 with open(TESTFN2, 'r') as f:
176 self.assertEqual(f.read(), "1")
177
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200178
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000179# Test attributes on return values from os.*stat* family.
180class StatAttributeTests(unittest.TestCase):
181 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000182 os.mkdir(support.TESTFN)
183 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000184 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000185 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000186 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000187
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000188 def tearDown(self):
189 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000190 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000191
Serhiy Storchaka43767632013-11-03 21:31:38 +0200192 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000193 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000194 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000195
196 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000197 self.assertEqual(result[stat.ST_SIZE], 3)
198 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000199
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000200 # Make sure all the attributes are there
201 members = dir(result)
202 for name in dir(stat):
203 if name[:3] == 'ST_':
204 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000205 if name.endswith("TIME"):
206 def trunc(x): return int(x)
207 else:
208 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000209 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000210 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000211 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000212
Larry Hastings6fe20b32012-04-19 15:07:49 -0700213 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700214 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700215 for name in 'st_atime st_mtime st_ctime'.split():
216 floaty = int(getattr(result, name) * 100000)
217 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700218 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700219
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 try:
221 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200222 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000223 except IndexError:
224 pass
225
226 # Make sure that assignment fails
227 try:
228 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200229 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000230 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000231 pass
232
233 try:
234 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200235 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000236 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000237 pass
238
239 try:
240 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200241 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 except AttributeError:
243 pass
244
245 # Use the stat_result constructor with a too-short tuple.
246 try:
247 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200248 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 except TypeError:
250 pass
251
Ezio Melotti42da6632011-03-15 05:18:48 +0200252 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000253 try:
254 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
255 except TypeError:
256 pass
257
Antoine Pitrou38425292010-09-21 18:19:07 +0000258 def test_stat_attributes(self):
259 self.check_stat_attributes(self.fname)
260
261 def test_stat_attributes_bytes(self):
262 try:
263 fname = self.fname.encode(sys.getfilesystemencoding())
264 except UnicodeEncodeError:
265 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100266 with warnings.catch_warnings():
267 warnings.simplefilter("ignore", DeprecationWarning)
268 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000269
Christian Heimes25827622013-10-12 01:27:08 +0200270 def test_stat_result_pickle(self):
271 result = os.stat(self.fname)
272 p = pickle.dumps(result)
273 self.assertIn(b'\x03cos\nstat_result\n', p)
274 unpickled = pickle.loads(p)
275 self.assertEqual(result, unpickled)
276
Serhiy Storchaka43767632013-11-03 21:31:38 +0200277 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000279 try:
280 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000281 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000282 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000283 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200284 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000285
286 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000287 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000288
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000289 # Make sure all the attributes are there.
290 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
291 'ffree', 'favail', 'flag', 'namemax')
292 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000293 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000294
295 # Make sure that assignment really fails
296 try:
297 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200298 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000299 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000300 pass
301
302 try:
303 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200304 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000305 except AttributeError:
306 pass
307
308 # Use the constructor with a too-short tuple.
309 try:
310 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200311 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000312 except TypeError:
313 pass
314
Ezio Melotti42da6632011-03-15 05:18:48 +0200315 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000316 try:
317 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
318 except TypeError:
319 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000320
Christian Heimes25827622013-10-12 01:27:08 +0200321 @unittest.skipUnless(hasattr(os, 'statvfs'),
322 "need os.statvfs()")
323 def test_statvfs_result_pickle(self):
324 try:
325 result = os.statvfs(self.fname)
326 except OSError as e:
327 # On AtheOS, glibc always returns ENOSYS
328 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200329 self.skipTest('os.statvfs() failed with ENOSYS')
330
Christian Heimes25827622013-10-12 01:27:08 +0200331 p = pickle.dumps(result)
332 self.assertIn(b'\x03cos\nstatvfs_result\n', p)
333 unpickled = pickle.loads(p)
334 self.assertEqual(result, unpickled)
335
Thomas Wouters89f507f2006-12-13 04:49:30 +0000336 def test_utime_dir(self):
337 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000338 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000339 # round to int, because some systems may support sub-second
340 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000341 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
342 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000343 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000344
Larry Hastings76ad59b2012-05-03 00:30:07 -0700345 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600346 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600347 # second argument. Check that the previous methods of passing
348 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700349 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600350 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700351 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
352 # Setting the time to the time you just read, then reading again,
353 # should always return exactly the same times.
354 st1 = os.stat(filename)
355 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
356 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600357 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700358 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600359 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700360 # Set to the current time in the new way
361 os.utime(filename)
362 st3 = os.stat(filename)
363 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
364
365 def test_utime(self):
366 def utime(file, times):
367 return os.utime(file, times)
368 self._test_utime(self.fname, getattr, utime, 10)
369 self._test_utime(support.TESTFN, getattr, utime, 10)
370
371
372 def _test_utime_ns(self, set_times_ns, test_dir=True):
373 def getattr_ns(o, attr):
374 return getattr(o, attr + "_ns")
375 ten_s = 10 * 1000 * 1000 * 1000
376 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
377 if test_dir:
378 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
379
380 def test_utime_ns(self):
381 def utime_ns(file, times):
382 return os.utime(file, ns=times)
383 self._test_utime_ns(utime_ns)
384
Larry Hastings9cf065c2012-06-22 16:30:09 -0700385 requires_utime_dir_fd = unittest.skipUnless(
386 os.utime in os.supports_dir_fd,
387 "dir_fd support for utime required for this test.")
388 requires_utime_fd = unittest.skipUnless(
389 os.utime in os.supports_fd,
390 "fd support for utime required for this test.")
391 requires_utime_nofollow_symlinks = unittest.skipUnless(
392 os.utime in os.supports_follow_symlinks,
393 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700394
Larry Hastings9cf065c2012-06-22 16:30:09 -0700395 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700396 def test_lutimes_ns(self):
397 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700398 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700399 self._test_utime_ns(lutimes_ns)
400
Larry Hastings9cf065c2012-06-22 16:30:09 -0700401 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700402 def test_futimes_ns(self):
403 def futimes_ns(file, times):
404 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700405 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700406 self._test_utime_ns(futimes_ns, test_dir=False)
407
408 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700409 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700410 getattr(os, name)(arg, (5, 5), ns=(5, 5))
411
412 def test_utime_invalid_arguments(self):
413 self._utime_invalid_arguments('utime', self.fname)
414
Brian Curtin52fbea12011-11-06 13:41:17 -0600415
Victor Stinner1aa54a42012-02-08 04:09:37 +0100416 @unittest.skipUnless(stat_supports_subsecond,
417 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100418 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100419 asec, amsec = 1, 901
420 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100421 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100422 mtime = msec + mmsec * 1e-3
423 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100424 os.utime(filename, (0, 0))
425 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200426 with warnings.catch_warnings():
427 warnings.simplefilter("ignore", DeprecationWarning)
428 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100429 st = os.stat(filename)
430 self.assertAlmostEqual(st.st_atime, atime, places=3)
431 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100432
Victor Stinnera2f7c002012-02-08 03:36:25 +0100433 def test_utime_subsecond(self):
434 def set_time(filename, atime, mtime):
435 os.utime(filename, (atime, mtime))
436 self._test_utime_subsecond(set_time)
437
Larry Hastings9cf065c2012-06-22 16:30:09 -0700438 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100439 def test_futimes_subsecond(self):
440 def set_time(filename, atime, mtime):
441 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700442 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100443 self._test_utime_subsecond(set_time)
444
Larry Hastings9cf065c2012-06-22 16:30:09 -0700445 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100446 def test_futimens_subsecond(self):
447 def set_time(filename, atime, mtime):
448 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700449 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100450 self._test_utime_subsecond(set_time)
451
Larry Hastings9cf065c2012-06-22 16:30:09 -0700452 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100453 def test_futimesat_subsecond(self):
454 def set_time(filename, atime, mtime):
455 dirname = os.path.dirname(filename)
456 dirfd = os.open(dirname, os.O_RDONLY)
457 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700458 os.utime(os.path.basename(filename), dir_fd=dirfd,
459 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100460 finally:
461 os.close(dirfd)
462 self._test_utime_subsecond(set_time)
463
Larry Hastings9cf065c2012-06-22 16:30:09 -0700464 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100465 def test_lutimes_subsecond(self):
466 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700467 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100468 self._test_utime_subsecond(set_time)
469
Larry Hastings9cf065c2012-06-22 16:30:09 -0700470 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100471 def test_utimensat_subsecond(self):
472 def set_time(filename, atime, mtime):
473 dirname = os.path.dirname(filename)
474 dirfd = os.open(dirname, os.O_RDONLY)
475 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700476 os.utime(os.path.basename(filename), dir_fd=dirfd,
477 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100478 finally:
479 os.close(dirfd)
480 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100481
Serhiy Storchaka43767632013-11-03 21:31:38 +0200482 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000483 # systems support centiseconds
Serhiy Storchaka43767632013-11-03 21:31:38 +0200484 def get_file_system(path):
485 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000486 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000487 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000488 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000489 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000490 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000491 return buf.value
492
Serhiy Storchaka43767632013-11-03 21:31:38 +0200493 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
494 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
495 "requires NTFS")
496 def test_1565150(self):
497 t1 = 1159195039.25
498 os.utime(self.fname, (t1, t1))
499 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000500
Serhiy Storchaka43767632013-11-03 21:31:38 +0200501 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
502 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
503 "requires NTFS")
504 def test_large_time(self):
505 t1 = 5000000000 # some day in 2128
506 os.utime(self.fname, (t1, t1))
507 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000508
Serhiy Storchaka43767632013-11-03 21:31:38 +0200509 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
510 def test_1686475(self):
511 # Verify that an open file can be stat'ed
512 try:
513 os.stat(r"c:\pagefile.sys")
514 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600515 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200516 except OSError as e:
517 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000518
Serhiy Storchaka43767632013-11-03 21:31:38 +0200519 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
520 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
521 def test_15261(self):
522 # Verify that stat'ing a closed fd does not cause crash
523 r, w = os.pipe()
524 try:
525 os.stat(r) # should not raise error
526 finally:
527 os.close(r)
528 os.close(w)
529 with self.assertRaises(OSError) as ctx:
530 os.stat(r)
531 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100532
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000533from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000534
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000535class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000536 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000537 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000538
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000539 def setUp(self):
540 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000541 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000542 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000543 for key, value in self._reference().items():
544 os.environ[key] = value
545
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000546 def tearDown(self):
547 os.environ.clear()
548 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000549 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000550 os.environb.clear()
551 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000552
Christian Heimes90333392007-11-01 19:08:42 +0000553 def _reference(self):
554 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
555
556 def _empty_mapping(self):
557 os.environ.clear()
558 return os.environ
559
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000560 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300561 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000562 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000563 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300564 os.environ.update(HELLO="World")
565 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
566 value = popen.read().strip()
567 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000568
Ezio Melottic7e139b2012-09-26 20:01:34 +0300569 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000570 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300571 with os.popen(
572 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
573 it = iter(popen)
574 self.assertEqual(next(it), "line1\n")
575 self.assertEqual(next(it), "line2\n")
576 self.assertEqual(next(it), "line3\n")
577 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000578
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000579 # Verify environ keys and values from the OS are of the
580 # correct str type.
581 def test_keyvalue_types(self):
582 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000583 self.assertEqual(type(key), str)
584 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000585
Christian Heimes90333392007-11-01 19:08:42 +0000586 def test_items(self):
587 for key, value in self._reference().items():
588 self.assertEqual(os.environ.get(key), value)
589
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000590 # Issue 7310
591 def test___repr__(self):
592 """Check that the repr() of os.environ looks like environ({...})."""
593 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000594 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
595 '{!r}: {!r}'.format(key, value)
596 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000597
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000598 def test_get_exec_path(self):
599 defpath_list = os.defpath.split(os.pathsep)
600 test_path = ['/monty', '/python', '', '/flying/circus']
601 test_env = {'PATH': os.pathsep.join(test_path)}
602
603 saved_environ = os.environ
604 try:
605 os.environ = dict(test_env)
606 # Test that defaulting to os.environ works.
607 self.assertSequenceEqual(test_path, os.get_exec_path())
608 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
609 finally:
610 os.environ = saved_environ
611
612 # No PATH environment variable
613 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
614 # Empty PATH environment variable
615 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
616 # Supplied PATH environment variable
617 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
618
Victor Stinnerb745a742010-05-18 17:17:23 +0000619 if os.supports_bytes_environ:
620 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000621 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000622 # ignore BytesWarning warning
623 with warnings.catch_warnings(record=True):
624 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000625 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000626 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000627 pass
628 else:
629 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000630
631 # bytes key and/or value
632 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
633 ['abc'])
634 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
635 ['abc'])
636 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
637 ['abc'])
638
639 @unittest.skipUnless(os.supports_bytes_environ,
640 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000641 def test_environb(self):
642 # os.environ -> os.environb
643 value = 'euro\u20ac'
644 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000645 value_bytes = value.encode(sys.getfilesystemencoding(),
646 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000647 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000648 msg = "U+20AC character is not encodable to %s" % (
649 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000650 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000651 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000652 self.assertEqual(os.environ['unicode'], value)
653 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000654
655 # os.environb -> os.environ
656 value = b'\xff'
657 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000658 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000659 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000660 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000661
Charles-François Natali2966f102011-11-26 11:32:46 +0100662 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
663 # #13415).
664 @support.requires_freebsd_version(7)
665 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100666 def test_unset_error(self):
667 if sys.platform == "win32":
668 # an environment variable is limited to 32,767 characters
669 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100670 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100671 else:
672 # "=" is not allowed in a variable name
673 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100674 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100675
Victor Stinner6d101392013-04-14 16:35:04 +0200676 def test_key_type(self):
677 missing = 'missingkey'
678 self.assertNotIn(missing, os.environ)
679
Victor Stinner839e5ea2013-04-14 16:43:03 +0200680 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200681 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200682 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200683 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200684
Victor Stinner839e5ea2013-04-14 16:43:03 +0200685 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200686 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200687 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200688 self.assertTrue(cm.exception.__suppress_context__)
689
Victor Stinner6d101392013-04-14 16:35:04 +0200690
Tim Petersc4e09402003-04-25 07:11:48 +0000691class WalkTests(unittest.TestCase):
692 """Tests for os.walk()."""
693
Charles-François Natali7372b062012-02-05 15:15:38 +0100694 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000695 import os
696 from os.path import join
697
698 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000699 # TESTFN/
700 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000701 # tmp1
702 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000703 # tmp2
704 # SUB11/ no kids
705 # SUB2/ a file kid and a dirsymlink kid
706 # tmp3
707 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200708 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000709 # TEST2/
710 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000711 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000712 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000713 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000714 sub2_path = join(walk_path, "SUB2")
715 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000716 tmp2_path = join(sub1_path, "tmp2")
717 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000718 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000719 t2_path = join(support.TESTFN, "TEST2")
720 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200721 link_path = join(sub2_path, "link")
722 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000723
724 # Create stuff.
725 os.makedirs(sub11_path)
726 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000727 os.makedirs(t2_path)
728 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000729 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000730 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
731 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000732 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400733 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400734 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200735 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000736 else:
737 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000738
739 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000740 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000741 self.assertEqual(len(all), 4)
742 # We can't know which order SUB1 and SUB2 will appear in.
743 # Not flipped: TESTFN, SUB1, SUB11, SUB2
744 # flipped: TESTFN, SUB2, SUB1, SUB11
745 flipped = all[0][1][0] != "SUB1"
746 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200747 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000748 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000749 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
750 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000751 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000752
753 # Prune the search.
754 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000755 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000756 all.append((root, dirs, files))
757 # Don't descend into SUB1.
758 if 'SUB1' in dirs:
759 # Note that this also mutates the dirs we appended to all!
760 dirs.remove('SUB1')
761 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000762 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200763 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000764 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000765
766 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000767 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000768 self.assertEqual(len(all), 4)
769 # We can't know which order SUB1 and SUB2 will appear in.
770 # Not flipped: SUB11, SUB1, SUB2, TESTFN
771 # flipped: SUB2, SUB11, SUB1, TESTFN
772 flipped = all[3][1][0] != "SUB1"
773 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200774 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000775 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000776 self.assertEqual(all[flipped], (sub11_path, [], []))
777 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000778 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000779
Brian Curtin3b4499c2010-12-28 14:31:47 +0000780 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000781 # Walk, following symlinks.
782 for root, dirs, files in os.walk(walk_path, followlinks=True):
783 if root == link_path:
784 self.assertEqual(dirs, [])
785 self.assertEqual(files, ["tmp4"])
786 break
787 else:
788 self.fail("Didn't follow symlink with followlinks=True")
789
790 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000791 # Tear everything down. This is a decent use for bottom-up on
792 # Windows, which doesn't have a recursive delete command. The
793 # (not so) subtlety is that rmdir will fail unless the dir's
794 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000795 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000796 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000797 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000798 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000799 dirname = os.path.join(root, name)
800 if not os.path.islink(dirname):
801 os.rmdir(dirname)
802 else:
803 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000804 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000805
Charles-François Natali7372b062012-02-05 15:15:38 +0100806
807@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
808class FwalkTests(WalkTests):
809 """Tests for os.fwalk()."""
810
Larry Hastingsc48fe982012-06-25 04:49:05 -0700811 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
812 """
813 compare with walk() results.
814 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700815 walk_kwargs = walk_kwargs.copy()
816 fwalk_kwargs = fwalk_kwargs.copy()
817 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
818 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
819 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700820
Charles-François Natali7372b062012-02-05 15:15:38 +0100821 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700822 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100823 expected[root] = (set(dirs), set(files))
824
Larry Hastingsc48fe982012-06-25 04:49:05 -0700825 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100826 self.assertIn(root, expected)
827 self.assertEqual(expected[root], (set(dirs), set(files)))
828
Larry Hastingsc48fe982012-06-25 04:49:05 -0700829 def test_compare_to_walk(self):
830 kwargs = {'top': support.TESTFN}
831 self._compare_to_walk(kwargs, kwargs)
832
Charles-François Natali7372b062012-02-05 15:15:38 +0100833 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700834 try:
835 fd = os.open(".", os.O_RDONLY)
836 walk_kwargs = {'top': support.TESTFN}
837 fwalk_kwargs = walk_kwargs.copy()
838 fwalk_kwargs['dir_fd'] = fd
839 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
840 finally:
841 os.close(fd)
842
843 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100844 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700845 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
846 args = support.TESTFN, topdown, None
847 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100848 # check that the FD is valid
849 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700850 # redundant check
851 os.stat(rootfd)
852 # check that listdir() returns consistent information
853 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100854
855 def test_fd_leak(self):
856 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
857 # we both check that calling fwalk() a large number of times doesn't
858 # yield EMFILE, and that the minimum allocated FD hasn't changed.
859 minfd = os.dup(1)
860 os.close(minfd)
861 for i in range(256):
862 for x in os.fwalk(support.TESTFN):
863 pass
864 newfd = os.dup(1)
865 self.addCleanup(os.close, newfd)
866 self.assertEqual(newfd, minfd)
867
868 def tearDown(self):
869 # cleanup
870 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
871 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700872 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100873 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700874 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700875 if stat.S_ISDIR(st.st_mode):
876 os.rmdir(name, dir_fd=rootfd)
877 else:
878 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100879 os.rmdir(support.TESTFN)
880
881
Guido van Rossume7ba4952007-06-06 23:52:48 +0000882class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000883 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000884 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000885
886 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000887 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000888 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
889 os.makedirs(path) # Should work
890 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
891 os.makedirs(path)
892
893 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000894 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000895 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
896 os.makedirs(path)
897 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
898 'dir5', 'dir6')
899 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000900
Terry Reedy5a22b652010-12-02 07:05:56 +0000901 def test_exist_ok_existing_directory(self):
902 path = os.path.join(support.TESTFN, 'dir1')
903 mode = 0o777
904 old_mask = os.umask(0o022)
905 os.makedirs(path, mode)
906 self.assertRaises(OSError, os.makedirs, path, mode)
907 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -0400908 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +0000909 os.makedirs(path, mode=mode, exist_ok=True)
910 os.umask(old_mask)
911
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400912 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700913 def test_chown_uid_gid_arguments_must_be_index(self):
914 stat = os.stat(support.TESTFN)
915 uid = stat.st_uid
916 gid = stat.st_gid
917 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
918 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
919 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
920 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
921 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
922
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700923 def test_exist_ok_s_isgid_directory(self):
924 path = os.path.join(support.TESTFN, 'dir1')
925 S_ISGID = stat.S_ISGID
926 mode = 0o777
927 old_mask = os.umask(0o022)
928 try:
929 existing_testfn_mode = stat.S_IMODE(
930 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700931 try:
932 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700933 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700934 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700935 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
936 raise unittest.SkipTest('No support for S_ISGID dir mode.')
937 # The os should apply S_ISGID from the parent dir for us, but
938 # this test need not depend on that behavior. Be explicit.
939 os.makedirs(path, mode | S_ISGID)
940 # http://bugs.python.org/issue14992
941 # Should not fail when the bit is already set.
942 os.makedirs(path, mode, exist_ok=True)
943 # remove the bit.
944 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -0400945 # May work even when the bit is not already set when demanded.
946 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700947 finally:
948 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000949
950 def test_exist_ok_existing_regular_file(self):
951 base = support.TESTFN
952 path = os.path.join(support.TESTFN, 'dir1')
953 f = open(path, 'w')
954 f.write('abc')
955 f.close()
956 self.assertRaises(OSError, os.makedirs, path)
957 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
958 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
959 os.remove(path)
960
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000961 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000962 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000963 'dir4', 'dir5', 'dir6')
964 # If the tests failed, the bottom-most directory ('../dir6')
965 # may not have been created, so we look for the outermost directory
966 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000967 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000968 path = os.path.dirname(path)
969
970 os.removedirs(path)
971
Andrew Svetlov405faed2012-12-25 12:18:09 +0200972
973class RemoveDirsTests(unittest.TestCase):
974 def setUp(self):
975 os.makedirs(support.TESTFN)
976
977 def tearDown(self):
978 support.rmtree(support.TESTFN)
979
980 def test_remove_all(self):
981 dira = os.path.join(support.TESTFN, 'dira')
982 os.mkdir(dira)
983 dirb = os.path.join(dira, 'dirb')
984 os.mkdir(dirb)
985 os.removedirs(dirb)
986 self.assertFalse(os.path.exists(dirb))
987 self.assertFalse(os.path.exists(dira))
988 self.assertFalse(os.path.exists(support.TESTFN))
989
990 def test_remove_partial(self):
991 dira = os.path.join(support.TESTFN, 'dira')
992 os.mkdir(dira)
993 dirb = os.path.join(dira, 'dirb')
994 os.mkdir(dirb)
995 with open(os.path.join(dira, 'file.txt'), 'w') as f:
996 f.write('text')
997 os.removedirs(dirb)
998 self.assertFalse(os.path.exists(dirb))
999 self.assertTrue(os.path.exists(dira))
1000 self.assertTrue(os.path.exists(support.TESTFN))
1001
1002 def test_remove_nothing(self):
1003 dira = os.path.join(support.TESTFN, 'dira')
1004 os.mkdir(dira)
1005 dirb = os.path.join(dira, 'dirb')
1006 os.mkdir(dirb)
1007 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1008 f.write('text')
1009 with self.assertRaises(OSError):
1010 os.removedirs(dirb)
1011 self.assertTrue(os.path.exists(dirb))
1012 self.assertTrue(os.path.exists(dira))
1013 self.assertTrue(os.path.exists(support.TESTFN))
1014
1015
Guido van Rossume7ba4952007-06-06 23:52:48 +00001016class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001017 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001018 with open(os.devnull, 'wb') as f:
1019 f.write(b'hello')
1020 f.close()
1021 with open(os.devnull, 'rb') as f:
1022 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001023
Andrew Svetlov405faed2012-12-25 12:18:09 +02001024
Guido van Rossume7ba4952007-06-06 23:52:48 +00001025class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001026 def test_urandom_length(self):
1027 self.assertEqual(len(os.urandom(0)), 0)
1028 self.assertEqual(len(os.urandom(1)), 1)
1029 self.assertEqual(len(os.urandom(10)), 10)
1030 self.assertEqual(len(os.urandom(100)), 100)
1031 self.assertEqual(len(os.urandom(1000)), 1000)
1032
1033 def test_urandom_value(self):
1034 data1 = os.urandom(16)
1035 data2 = os.urandom(16)
1036 self.assertNotEqual(data1, data2)
1037
1038 def get_urandom_subprocess(self, count):
1039 code = '\n'.join((
1040 'import os, sys',
1041 'data = os.urandom(%s)' % count,
1042 'sys.stdout.buffer.write(data)',
1043 'sys.stdout.buffer.flush()'))
1044 out = assert_python_ok('-c', code)
1045 stdout = out[1]
1046 self.assertEqual(len(stdout), 16)
1047 return stdout
1048
1049 def test_urandom_subprocess(self):
1050 data1 = self.get_urandom_subprocess(16)
1051 data2 = self.get_urandom_subprocess(16)
1052 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001053
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001054 @unittest.skipUnless(resource, "test requires the resource module")
1055 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001056 # Check urandom() failing when it is not able to open /dev/random.
1057 # We spawn a new process to make the test more robust (if getrlimit()
1058 # failed to restore the file descriptor limit after this, the whole
1059 # test suite would crash; this actually happened on the OS X Tiger
1060 # buildbot).
1061 code = """if 1:
1062 import errno
1063 import os
1064 import resource
1065
1066 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1067 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1068 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001069 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001070 except OSError as e:
1071 assert e.errno == errno.EMFILE, e.errno
1072 else:
1073 raise AssertionError("OSError not raised")
1074 """
1075 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001076
Antoine Pitroue472aea2014-04-26 14:33:03 +02001077 def test_urandom_fd_closed(self):
1078 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1079 # closed.
1080 code = """if 1:
1081 import os
1082 import sys
1083 os.urandom(4)
1084 os.closerange(3, 256)
1085 sys.stdout.buffer.write(os.urandom(4))
1086 """
1087 rc, out, err = assert_python_ok('-Sc', code)
1088
1089 def test_urandom_fd_reopened(self):
1090 # Issue #21207: urandom() should detect its fd to /dev/urandom
1091 # changed to something else, and reopen it.
1092 with open(support.TESTFN, 'wb') as f:
1093 f.write(b"x" * 256)
1094 self.addCleanup(os.unlink, support.TESTFN)
1095 code = """if 1:
1096 import os
1097 import sys
1098 os.urandom(4)
1099 for fd in range(3, 256):
1100 try:
1101 os.close(fd)
1102 except OSError:
1103 pass
1104 else:
1105 # Found the urandom fd (XXX hopefully)
1106 break
1107 os.closerange(3, 256)
1108 with open({TESTFN!r}, 'rb') as f:
1109 os.dup2(f.fileno(), fd)
1110 sys.stdout.buffer.write(os.urandom(4))
1111 sys.stdout.buffer.write(os.urandom(4))
1112 """.format(TESTFN=support.TESTFN)
1113 rc, out, err = assert_python_ok('-Sc', code)
1114 self.assertEqual(len(out), 8)
1115 self.assertNotEqual(out[0:4], out[4:8])
1116 rc, out2, err2 = assert_python_ok('-Sc', code)
1117 self.assertEqual(len(out2), 8)
1118 self.assertNotEqual(out2, out)
1119
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001120
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001121@contextlib.contextmanager
1122def _execvpe_mockup(defpath=None):
1123 """
1124 Stubs out execv and execve functions when used as context manager.
1125 Records exec calls. The mock execv and execve functions always raise an
1126 exception as they would normally never return.
1127 """
1128 # A list of tuples containing (function name, first arg, args)
1129 # of calls to execv or execve that have been made.
1130 calls = []
1131
1132 def mock_execv(name, *args):
1133 calls.append(('execv', name, args))
1134 raise RuntimeError("execv called")
1135
1136 def mock_execve(name, *args):
1137 calls.append(('execve', name, args))
1138 raise OSError(errno.ENOTDIR, "execve called")
1139
1140 try:
1141 orig_execv = os.execv
1142 orig_execve = os.execve
1143 orig_defpath = os.defpath
1144 os.execv = mock_execv
1145 os.execve = mock_execve
1146 if defpath is not None:
1147 os.defpath = defpath
1148 yield calls
1149 finally:
1150 os.execv = orig_execv
1151 os.execve = orig_execve
1152 os.defpath = orig_defpath
1153
Guido van Rossume7ba4952007-06-06 23:52:48 +00001154class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001155 @unittest.skipIf(USING_LINUXTHREADS,
1156 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001157 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001158 self.assertRaises(OSError, os.execvpe, 'no such app-',
1159 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001160
Thomas Heller6790d602007-08-30 17:15:14 +00001161 def test_execvpe_with_bad_arglist(self):
1162 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1163
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001164 @unittest.skipUnless(hasattr(os, '_execvpe'),
1165 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001166 def _test_internal_execvpe(self, test_type):
1167 program_path = os.sep + 'absolutepath'
1168 if test_type is bytes:
1169 program = b'executable'
1170 fullpath = os.path.join(os.fsencode(program_path), program)
1171 native_fullpath = fullpath
1172 arguments = [b'progname', 'arg1', 'arg2']
1173 else:
1174 program = 'executable'
1175 arguments = ['progname', 'arg1', 'arg2']
1176 fullpath = os.path.join(program_path, program)
1177 if os.name != "nt":
1178 native_fullpath = os.fsencode(fullpath)
1179 else:
1180 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001181 env = {'spam': 'beans'}
1182
Victor Stinnerb745a742010-05-18 17:17:23 +00001183 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001184 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001185 self.assertRaises(RuntimeError,
1186 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001187 self.assertEqual(len(calls), 1)
1188 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1189
Victor Stinnerb745a742010-05-18 17:17:23 +00001190 # test os._execvpe() with a relative path:
1191 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001192 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001193 self.assertRaises(OSError,
1194 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001195 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001196 self.assertSequenceEqual(calls[0],
1197 ('execve', native_fullpath, (arguments, env)))
1198
1199 # test os._execvpe() with a relative path:
1200 # os.get_exec_path() reads the 'PATH' variable
1201 with _execvpe_mockup() as calls:
1202 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001203 if test_type is bytes:
1204 env_path[b'PATH'] = program_path
1205 else:
1206 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001207 self.assertRaises(OSError,
1208 os._execvpe, program, arguments, env=env_path)
1209 self.assertEqual(len(calls), 1)
1210 self.assertSequenceEqual(calls[0],
1211 ('execve', native_fullpath, (arguments, env_path)))
1212
1213 def test_internal_execvpe_str(self):
1214 self._test_internal_execvpe(str)
1215 if os.name != "nt":
1216 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001217
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001218
Serhiy Storchaka43767632013-11-03 21:31:38 +02001219@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001220class Win32ErrorTests(unittest.TestCase):
1221 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001222 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001223
1224 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001225 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001226
1227 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001228 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001229
1230 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001231 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001232 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001233 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001234 finally:
1235 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001236 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001237
1238 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001239 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001240
Thomas Wouters477c8d52006-05-27 19:21:47 +00001241 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001242 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001243
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001244class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001245 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001246 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1247 #singles.append("close")
1248 #We omit close because it doesn'r raise an exception on some platforms
1249 def get_single(f):
1250 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001251 if hasattr(os, f):
1252 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001253 return helper
1254 for f in singles:
1255 locals()["test_"+f] = get_single(f)
1256
Benjamin Peterson7522c742009-01-19 21:00:09 +00001257 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001258 try:
1259 f(support.make_bad_fd(), *args)
1260 except OSError as e:
1261 self.assertEqual(e.errno, errno.EBADF)
1262 else:
1263 self.fail("%r didn't raise a OSError with a bad file descriptor"
1264 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001265
Serhiy Storchaka43767632013-11-03 21:31:38 +02001266 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001267 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001268 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001269
Serhiy Storchaka43767632013-11-03 21:31:38 +02001270 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001271 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001272 fd = support.make_bad_fd()
1273 # Make sure none of the descriptors we are about to close are
1274 # currently valid (issue 6542).
1275 for i in range(10):
1276 try: os.fstat(fd+i)
1277 except OSError:
1278 pass
1279 else:
1280 break
1281 if i < 2:
1282 raise unittest.SkipTest(
1283 "Unable to acquire a range of invalid file descriptors")
1284 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001285
Serhiy Storchaka43767632013-11-03 21:31:38 +02001286 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001287 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001288 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001289
Serhiy Storchaka43767632013-11-03 21:31:38 +02001290 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001291 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001292 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001293
Serhiy Storchaka43767632013-11-03 21:31:38 +02001294 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001295 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001296 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001297
Serhiy Storchaka43767632013-11-03 21:31:38 +02001298 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001299 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001300 self.check(os.pathconf, "PC_NAME_MAX")
1301 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001302
Serhiy Storchaka43767632013-11-03 21:31:38 +02001303 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001304 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001305 self.check(os.truncate, 0)
1306 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001307
Serhiy Storchaka43767632013-11-03 21:31:38 +02001308 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001309 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001310 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001311
Serhiy Storchaka43767632013-11-03 21:31:38 +02001312 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001313 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001314 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001315
Victor Stinner57ddf782014-01-08 15:21:28 +01001316 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1317 def test_readv(self):
1318 buf = bytearray(10)
1319 self.check(os.readv, [buf])
1320
Serhiy Storchaka43767632013-11-03 21:31:38 +02001321 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001322 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001323 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001324
Serhiy Storchaka43767632013-11-03 21:31:38 +02001325 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001326 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001327 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001328
Victor Stinner57ddf782014-01-08 15:21:28 +01001329 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1330 def test_writev(self):
1331 self.check(os.writev, [b'abc'])
1332
Brian Curtin1b9df392010-11-24 20:24:31 +00001333
1334class LinkTests(unittest.TestCase):
1335 def setUp(self):
1336 self.file1 = support.TESTFN
1337 self.file2 = os.path.join(support.TESTFN + "2")
1338
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001339 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001340 for file in (self.file1, self.file2):
1341 if os.path.exists(file):
1342 os.unlink(file)
1343
Brian Curtin1b9df392010-11-24 20:24:31 +00001344 def _test_link(self, file1, file2):
1345 with open(file1, "w") as f1:
1346 f1.write("test")
1347
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001348 with warnings.catch_warnings():
1349 warnings.simplefilter("ignore", DeprecationWarning)
1350 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001351 with open(file1, "r") as f1, open(file2, "r") as f2:
1352 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1353
1354 def test_link(self):
1355 self._test_link(self.file1, self.file2)
1356
1357 def test_link_bytes(self):
1358 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1359 bytes(self.file2, sys.getfilesystemencoding()))
1360
Brian Curtinf498b752010-11-30 15:54:04 +00001361 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001362 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001363 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001364 except UnicodeError:
1365 raise unittest.SkipTest("Unable to encode for this platform.")
1366
Brian Curtinf498b752010-11-30 15:54:04 +00001367 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001368 self.file2 = self.file1 + "2"
1369 self._test_link(self.file1, self.file2)
1370
Serhiy Storchaka43767632013-11-03 21:31:38 +02001371@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1372class PosixUidGidTests(unittest.TestCase):
1373 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1374 def test_setuid(self):
1375 if os.getuid() != 0:
1376 self.assertRaises(OSError, os.setuid, 0)
1377 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001378
Serhiy Storchaka43767632013-11-03 21:31:38 +02001379 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1380 def test_setgid(self):
1381 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1382 self.assertRaises(OSError, os.setgid, 0)
1383 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001384
Serhiy Storchaka43767632013-11-03 21:31:38 +02001385 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1386 def test_seteuid(self):
1387 if os.getuid() != 0:
1388 self.assertRaises(OSError, os.seteuid, 0)
1389 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001390
Serhiy Storchaka43767632013-11-03 21:31:38 +02001391 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1392 def test_setegid(self):
1393 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1394 self.assertRaises(OSError, os.setegid, 0)
1395 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001396
Serhiy Storchaka43767632013-11-03 21:31:38 +02001397 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1398 def test_setreuid(self):
1399 if os.getuid() != 0:
1400 self.assertRaises(OSError, os.setreuid, 0, 0)
1401 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1402 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001403
Serhiy Storchaka43767632013-11-03 21:31:38 +02001404 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1405 def test_setreuid_neg1(self):
1406 # Needs to accept -1. We run this in a subprocess to avoid
1407 # altering the test runner's process state (issue8045).
1408 subprocess.check_call([
1409 sys.executable, '-c',
1410 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001411
Serhiy Storchaka43767632013-11-03 21:31:38 +02001412 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1413 def test_setregid(self):
1414 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1415 self.assertRaises(OSError, os.setregid, 0, 0)
1416 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1417 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001418
Serhiy Storchaka43767632013-11-03 21:31:38 +02001419 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1420 def test_setregid_neg1(self):
1421 # Needs to accept -1. We run this in a subprocess to avoid
1422 # altering the test runner's process state (issue8045).
1423 subprocess.check_call([
1424 sys.executable, '-c',
1425 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001426
Serhiy Storchaka43767632013-11-03 21:31:38 +02001427@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1428class Pep383Tests(unittest.TestCase):
1429 def setUp(self):
1430 if support.TESTFN_UNENCODABLE:
1431 self.dir = support.TESTFN_UNENCODABLE
1432 elif support.TESTFN_NONASCII:
1433 self.dir = support.TESTFN_NONASCII
1434 else:
1435 self.dir = support.TESTFN
1436 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001437
Serhiy Storchaka43767632013-11-03 21:31:38 +02001438 bytesfn = []
1439 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001440 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001441 fn = os.fsencode(fn)
1442 except UnicodeEncodeError:
1443 return
1444 bytesfn.append(fn)
1445 add_filename(support.TESTFN_UNICODE)
1446 if support.TESTFN_UNENCODABLE:
1447 add_filename(support.TESTFN_UNENCODABLE)
1448 if support.TESTFN_NONASCII:
1449 add_filename(support.TESTFN_NONASCII)
1450 if not bytesfn:
1451 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001452
Serhiy Storchaka43767632013-11-03 21:31:38 +02001453 self.unicodefn = set()
1454 os.mkdir(self.dir)
1455 try:
1456 for fn in bytesfn:
1457 support.create_empty_file(os.path.join(self.bdir, fn))
1458 fn = os.fsdecode(fn)
1459 if fn in self.unicodefn:
1460 raise ValueError("duplicate filename")
1461 self.unicodefn.add(fn)
1462 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001463 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001464 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001465
Serhiy Storchaka43767632013-11-03 21:31:38 +02001466 def tearDown(self):
1467 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001468
Serhiy Storchaka43767632013-11-03 21:31:38 +02001469 def test_listdir(self):
1470 expected = self.unicodefn
1471 found = set(os.listdir(self.dir))
1472 self.assertEqual(found, expected)
1473 # test listdir without arguments
1474 current_directory = os.getcwd()
1475 try:
1476 os.chdir(os.sep)
1477 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1478 finally:
1479 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001480
Serhiy Storchaka43767632013-11-03 21:31:38 +02001481 def test_open(self):
1482 for fn in self.unicodefn:
1483 f = open(os.path.join(self.dir, fn), 'rb')
1484 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001485
Serhiy Storchaka43767632013-11-03 21:31:38 +02001486 @unittest.skipUnless(hasattr(os, 'statvfs'),
1487 "need os.statvfs()")
1488 def test_statvfs(self):
1489 # issue #9645
1490 for fn in self.unicodefn:
1491 # should not fail with file not found error
1492 fullname = os.path.join(self.dir, fn)
1493 os.statvfs(fullname)
1494
1495 def test_stat(self):
1496 for fn in self.unicodefn:
1497 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001498
Brian Curtineb24d742010-04-12 17:16:38 +00001499@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1500class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001501 def _kill(self, sig):
1502 # Start sys.executable as a subprocess and communicate from the
1503 # subprocess to the parent that the interpreter is ready. When it
1504 # becomes ready, send *sig* via os.kill to the subprocess and check
1505 # that the return code is equal to *sig*.
1506 import ctypes
1507 from ctypes import wintypes
1508 import msvcrt
1509
1510 # Since we can't access the contents of the process' stdout until the
1511 # process has exited, use PeekNamedPipe to see what's inside stdout
1512 # without waiting. This is done so we can tell that the interpreter
1513 # is started and running at a point where it could handle a signal.
1514 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1515 PeekNamedPipe.restype = wintypes.BOOL
1516 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1517 ctypes.POINTER(ctypes.c_char), # stdout buf
1518 wintypes.DWORD, # Buffer size
1519 ctypes.POINTER(wintypes.DWORD), # bytes read
1520 ctypes.POINTER(wintypes.DWORD), # bytes avail
1521 ctypes.POINTER(wintypes.DWORD)) # bytes left
1522 msg = "running"
1523 proc = subprocess.Popen([sys.executable, "-c",
1524 "import sys;"
1525 "sys.stdout.write('{}');"
1526 "sys.stdout.flush();"
1527 "input()".format(msg)],
1528 stdout=subprocess.PIPE,
1529 stderr=subprocess.PIPE,
1530 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001531 self.addCleanup(proc.stdout.close)
1532 self.addCleanup(proc.stderr.close)
1533 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001534
1535 count, max = 0, 100
1536 while count < max and proc.poll() is None:
1537 # Create a string buffer to store the result of stdout from the pipe
1538 buf = ctypes.create_string_buffer(len(msg))
1539 # Obtain the text currently in proc.stdout
1540 # Bytes read/avail/left are left as NULL and unused
1541 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1542 buf, ctypes.sizeof(buf), None, None, None)
1543 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1544 if buf.value:
1545 self.assertEqual(msg, buf.value.decode())
1546 break
1547 time.sleep(0.1)
1548 count += 1
1549 else:
1550 self.fail("Did not receive communication from the subprocess")
1551
Brian Curtineb24d742010-04-12 17:16:38 +00001552 os.kill(proc.pid, sig)
1553 self.assertEqual(proc.wait(), sig)
1554
1555 def test_kill_sigterm(self):
1556 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001557 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001558
1559 def test_kill_int(self):
1560 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001561 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001562
1563 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001564 tagname = "test_os_%s" % uuid.uuid1()
1565 m = mmap.mmap(-1, 1, tagname)
1566 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001567 # Run a script which has console control handling enabled.
1568 proc = subprocess.Popen([sys.executable,
1569 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001570 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001571 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1572 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001573 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001574 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001575 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001576 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001577 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001578 count += 1
1579 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001580 # Forcefully kill the process if we weren't able to signal it.
1581 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001582 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001583 os.kill(proc.pid, event)
1584 # proc.send_signal(event) could also be done here.
1585 # Allow time for the signal to be passed and the process to exit.
1586 time.sleep(0.5)
1587 if not proc.poll():
1588 # Forcefully kill the process if we weren't able to signal it.
1589 os.kill(proc.pid, signal.SIGINT)
1590 self.fail("subprocess did not stop on {}".format(name))
1591
1592 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1593 def test_CTRL_C_EVENT(self):
1594 from ctypes import wintypes
1595 import ctypes
1596
1597 # Make a NULL value by creating a pointer with no argument.
1598 NULL = ctypes.POINTER(ctypes.c_int)()
1599 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1600 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1601 wintypes.BOOL)
1602 SetConsoleCtrlHandler.restype = wintypes.BOOL
1603
1604 # Calling this with NULL and FALSE causes the calling process to
1605 # handle CTRL+C, rather than ignore it. This property is inherited
1606 # by subprocesses.
1607 SetConsoleCtrlHandler(NULL, 0)
1608
1609 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1610
1611 def test_CTRL_BREAK_EVENT(self):
1612 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1613
1614
Brian Curtind40e6f72010-07-08 21:39:08 +00001615@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001616class Win32ListdirTests(unittest.TestCase):
1617 """Test listdir on Windows."""
1618
1619 def setUp(self):
1620 self.created_paths = []
1621 for i in range(2):
1622 dir_name = 'SUB%d' % i
1623 dir_path = os.path.join(support.TESTFN, dir_name)
1624 file_name = 'FILE%d' % i
1625 file_path = os.path.join(support.TESTFN, file_name)
1626 os.makedirs(dir_path)
1627 with open(file_path, 'w') as f:
1628 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1629 self.created_paths.extend([dir_name, file_name])
1630 self.created_paths.sort()
1631
1632 def tearDown(self):
1633 shutil.rmtree(support.TESTFN)
1634
1635 def test_listdir_no_extended_path(self):
1636 """Test when the path is not an "extended" path."""
1637 # unicode
1638 self.assertEqual(
1639 sorted(os.listdir(support.TESTFN)),
1640 self.created_paths)
1641 # bytes
1642 self.assertEqual(
1643 sorted(os.listdir(os.fsencode(support.TESTFN))),
1644 [os.fsencode(path) for path in self.created_paths])
1645
1646 def test_listdir_extended_path(self):
1647 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001648 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001649 # unicode
1650 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1651 self.assertEqual(
1652 sorted(os.listdir(path)),
1653 self.created_paths)
1654 # bytes
1655 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1656 self.assertEqual(
1657 sorted(os.listdir(path)),
1658 [os.fsencode(path) for path in self.created_paths])
1659
1660
1661@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001662@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001663class Win32SymlinkTests(unittest.TestCase):
1664 filelink = 'filelinktest'
1665 filelink_target = os.path.abspath(__file__)
1666 dirlink = 'dirlinktest'
1667 dirlink_target = os.path.dirname(filelink_target)
1668 missing_link = 'missing link'
1669
1670 def setUp(self):
1671 assert os.path.exists(self.dirlink_target)
1672 assert os.path.exists(self.filelink_target)
1673 assert not os.path.exists(self.dirlink)
1674 assert not os.path.exists(self.filelink)
1675 assert not os.path.exists(self.missing_link)
1676
1677 def tearDown(self):
1678 if os.path.exists(self.filelink):
1679 os.remove(self.filelink)
1680 if os.path.exists(self.dirlink):
1681 os.rmdir(self.dirlink)
1682 if os.path.lexists(self.missing_link):
1683 os.remove(self.missing_link)
1684
1685 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001686 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001687 self.assertTrue(os.path.exists(self.dirlink))
1688 self.assertTrue(os.path.isdir(self.dirlink))
1689 self.assertTrue(os.path.islink(self.dirlink))
1690 self.check_stat(self.dirlink, self.dirlink_target)
1691
1692 def test_file_link(self):
1693 os.symlink(self.filelink_target, self.filelink)
1694 self.assertTrue(os.path.exists(self.filelink))
1695 self.assertTrue(os.path.isfile(self.filelink))
1696 self.assertTrue(os.path.islink(self.filelink))
1697 self.check_stat(self.filelink, self.filelink_target)
1698
1699 def _create_missing_dir_link(self):
1700 'Create a "directory" link to a non-existent target'
1701 linkname = self.missing_link
1702 if os.path.lexists(linkname):
1703 os.remove(linkname)
1704 target = r'c:\\target does not exist.29r3c740'
1705 assert not os.path.exists(target)
1706 target_is_dir = True
1707 os.symlink(target, linkname, target_is_dir)
1708
1709 def test_remove_directory_link_to_missing_target(self):
1710 self._create_missing_dir_link()
1711 # For compatibility with Unix, os.remove will check the
1712 # directory status and call RemoveDirectory if the symlink
1713 # was created with target_is_dir==True.
1714 os.remove(self.missing_link)
1715
1716 @unittest.skip("currently fails; consider for improvement")
1717 def test_isdir_on_directory_link_to_missing_target(self):
1718 self._create_missing_dir_link()
1719 # consider having isdir return true for directory links
1720 self.assertTrue(os.path.isdir(self.missing_link))
1721
1722 @unittest.skip("currently fails; consider for improvement")
1723 def test_rmdir_on_directory_link_to_missing_target(self):
1724 self._create_missing_dir_link()
1725 # consider allowing rmdir to remove directory links
1726 os.rmdir(self.missing_link)
1727
1728 def check_stat(self, link, target):
1729 self.assertEqual(os.stat(link), os.stat(target))
1730 self.assertNotEqual(os.lstat(link), os.stat(link))
1731
Brian Curtind25aef52011-06-13 15:16:04 -05001732 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001733 with warnings.catch_warnings():
1734 warnings.simplefilter("ignore", DeprecationWarning)
1735 self.assertEqual(os.stat(bytes_link), os.stat(target))
1736 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001737
1738 def test_12084(self):
1739 level1 = os.path.abspath(support.TESTFN)
1740 level2 = os.path.join(level1, "level2")
1741 level3 = os.path.join(level2, "level3")
1742 try:
1743 os.mkdir(level1)
1744 os.mkdir(level2)
1745 os.mkdir(level3)
1746
1747 file1 = os.path.abspath(os.path.join(level1, "file1"))
1748
1749 with open(file1, "w") as f:
1750 f.write("file1")
1751
1752 orig_dir = os.getcwd()
1753 try:
1754 os.chdir(level2)
1755 link = os.path.join(level2, "link")
1756 os.symlink(os.path.relpath(file1), "link")
1757 self.assertIn("link", os.listdir(os.getcwd()))
1758
1759 # Check os.stat calls from the same dir as the link
1760 self.assertEqual(os.stat(file1), os.stat("link"))
1761
1762 # Check os.stat calls from a dir below the link
1763 os.chdir(level1)
1764 self.assertEqual(os.stat(file1),
1765 os.stat(os.path.relpath(link)))
1766
1767 # Check os.stat calls from a dir above the link
1768 os.chdir(level3)
1769 self.assertEqual(os.stat(file1),
1770 os.stat(os.path.relpath(link)))
1771 finally:
1772 os.chdir(orig_dir)
1773 except OSError as err:
1774 self.fail(err)
1775 finally:
1776 os.remove(file1)
1777 shutil.rmtree(level1)
1778
Brian Curtind40e6f72010-07-08 21:39:08 +00001779
Tim Golden0321cf22014-05-05 19:46:17 +01001780@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1781class Win32JunctionTests(unittest.TestCase):
1782 junction = 'junctiontest'
1783 junction_target = os.path.dirname(os.path.abspath(__file__))
1784
1785 def setUp(self):
1786 assert os.path.exists(self.junction_target)
1787 assert not os.path.exists(self.junction)
1788
1789 def tearDown(self):
1790 if os.path.exists(self.junction):
1791 # os.rmdir delegates to Windows' RemoveDirectoryW,
1792 # which removes junction points safely.
1793 os.rmdir(self.junction)
1794
1795 def test_create_junction(self):
1796 _winapi.CreateJunction(self.junction_target, self.junction)
1797 self.assertTrue(os.path.exists(self.junction))
1798 self.assertTrue(os.path.isdir(self.junction))
1799
1800 # Junctions are not recognized as links.
1801 self.assertFalse(os.path.islink(self.junction))
1802
1803 def test_unlink_removes_junction(self):
1804 _winapi.CreateJunction(self.junction_target, self.junction)
1805 self.assertTrue(os.path.exists(self.junction))
1806
1807 os.unlink(self.junction)
1808 self.assertFalse(os.path.exists(self.junction))
1809
1810
Jason R. Coombs3a092862013-05-27 23:21:28 -04001811@support.skip_unless_symlink
1812class NonLocalSymlinkTests(unittest.TestCase):
1813
1814 def setUp(self):
1815 """
1816 Create this structure:
1817
1818 base
1819 \___ some_dir
1820 """
1821 os.makedirs('base/some_dir')
1822
1823 def tearDown(self):
1824 shutil.rmtree('base')
1825
1826 def test_directory_link_nonlocal(self):
1827 """
1828 The symlink target should resolve relative to the link, not relative
1829 to the current directory.
1830
1831 Then, link base/some_link -> base/some_dir and ensure that some_link
1832 is resolved as a directory.
1833
1834 In issue13772, it was discovered that directory detection failed if
1835 the symlink target was not specified relative to the current
1836 directory, which was a defect in the implementation.
1837 """
1838 src = os.path.join('base', 'some_link')
1839 os.symlink('some_dir', src)
1840 assert os.path.isdir(src)
1841
1842
Victor Stinnere8d51452010-08-19 01:05:19 +00001843class FSEncodingTests(unittest.TestCase):
1844 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001845 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1846 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001847
Victor Stinnere8d51452010-08-19 01:05:19 +00001848 def test_identity(self):
1849 # assert fsdecode(fsencode(x)) == x
1850 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1851 try:
1852 bytesfn = os.fsencode(fn)
1853 except UnicodeEncodeError:
1854 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001855 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001856
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001857
Brett Cannonefb00c02012-02-29 18:31:31 -05001858
1859class DeviceEncodingTests(unittest.TestCase):
1860
1861 def test_bad_fd(self):
1862 # Return None when an fd doesn't actually exist.
1863 self.assertIsNone(os.device_encoding(123456))
1864
Philip Jenveye308b7c2012-02-29 16:16:15 -08001865 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1866 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001867 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001868 def test_device_encoding(self):
1869 encoding = os.device_encoding(0)
1870 self.assertIsNotNone(encoding)
1871 self.assertTrue(codecs.lookup(encoding))
1872
1873
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001874class PidTests(unittest.TestCase):
1875 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1876 def test_getppid(self):
1877 p = subprocess.Popen([sys.executable, '-c',
1878 'import os; print(os.getppid())'],
1879 stdout=subprocess.PIPE)
1880 stdout, _ = p.communicate()
1881 # We are the parent of our subprocess
1882 self.assertEqual(int(stdout), os.getpid())
1883
1884
Brian Curtin0151b8e2010-09-24 13:43:43 +00001885# The introduction of this TestCase caused at least two different errors on
1886# *nix buildbots. Temporarily skip this to let the buildbots move along.
1887@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001888@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1889class LoginTests(unittest.TestCase):
1890 def test_getlogin(self):
1891 user_name = os.getlogin()
1892 self.assertNotEqual(len(user_name), 0)
1893
1894
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001895@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1896 "needs os.getpriority and os.setpriority")
1897class ProgramPriorityTests(unittest.TestCase):
1898 """Tests for os.getpriority() and os.setpriority()."""
1899
1900 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001901
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001902 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1903 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1904 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001905 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1906 if base >= 19 and new_prio <= 19:
1907 raise unittest.SkipTest(
1908 "unable to reliably test setpriority at current nice level of %s" % base)
1909 else:
1910 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001911 finally:
1912 try:
1913 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1914 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001915 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001916 raise
1917
1918
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001919if threading is not None:
1920 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001921
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001922 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001923
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001924 def __init__(self, conn):
1925 asynchat.async_chat.__init__(self, conn)
1926 self.in_buffer = []
1927 self.closed = False
1928 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001929
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001930 def handle_read(self):
1931 data = self.recv(4096)
1932 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001933
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001934 def get_data(self):
1935 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001936
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001937 def handle_close(self):
1938 self.close()
1939 self.closed = True
1940
1941 def handle_error(self):
1942 raise
1943
1944 def __init__(self, address):
1945 threading.Thread.__init__(self)
1946 asyncore.dispatcher.__init__(self)
1947 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1948 self.bind(address)
1949 self.listen(5)
1950 self.host, self.port = self.socket.getsockname()[:2]
1951 self.handler_instance = None
1952 self._active = False
1953 self._active_lock = threading.Lock()
1954
1955 # --- public API
1956
1957 @property
1958 def running(self):
1959 return self._active
1960
1961 def start(self):
1962 assert not self.running
1963 self.__flag = threading.Event()
1964 threading.Thread.start(self)
1965 self.__flag.wait()
1966
1967 def stop(self):
1968 assert self.running
1969 self._active = False
1970 self.join()
1971
1972 def wait(self):
1973 # wait for handler connection to be closed, then stop the server
1974 while not getattr(self.handler_instance, "closed", False):
1975 time.sleep(0.001)
1976 self.stop()
1977
1978 # --- internals
1979
1980 def run(self):
1981 self._active = True
1982 self.__flag.set()
1983 while self._active and asyncore.socket_map:
1984 self._active_lock.acquire()
1985 asyncore.loop(timeout=0.001, count=1)
1986 self._active_lock.release()
1987 asyncore.close_all()
1988
1989 def handle_accept(self):
1990 conn, addr = self.accept()
1991 self.handler_instance = self.Handler(conn)
1992
1993 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001994 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001995 handle_read = handle_connect
1996
1997 def writable(self):
1998 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001999
2000 def handle_error(self):
2001 raise
2002
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002003
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002004@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002005@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2006class TestSendfile(unittest.TestCase):
2007
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002008 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002009 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002010 not sys.platform.startswith("solaris") and \
2011 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002012 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2013 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002014
2015 @classmethod
2016 def setUpClass(cls):
2017 with open(support.TESTFN, "wb") as f:
2018 f.write(cls.DATA)
2019
2020 @classmethod
2021 def tearDownClass(cls):
2022 support.unlink(support.TESTFN)
2023
2024 def setUp(self):
2025 self.server = SendfileTestServer((support.HOST, 0))
2026 self.server.start()
2027 self.client = socket.socket()
2028 self.client.connect((self.server.host, self.server.port))
2029 self.client.settimeout(1)
2030 # synchronize by waiting for "220 ready" response
2031 self.client.recv(1024)
2032 self.sockno = self.client.fileno()
2033 self.file = open(support.TESTFN, 'rb')
2034 self.fileno = self.file.fileno()
2035
2036 def tearDown(self):
2037 self.file.close()
2038 self.client.close()
2039 if self.server.running:
2040 self.server.stop()
2041
2042 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2043 """A higher level wrapper representing how an application is
2044 supposed to use sendfile().
2045 """
2046 while 1:
2047 try:
2048 if self.SUPPORT_HEADERS_TRAILERS:
2049 return os.sendfile(sock, file, offset, nbytes, headers,
2050 trailers)
2051 else:
2052 return os.sendfile(sock, file, offset, nbytes)
2053 except OSError as err:
2054 if err.errno == errno.ECONNRESET:
2055 # disconnected
2056 raise
2057 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2058 # we have to retry send data
2059 continue
2060 else:
2061 raise
2062
2063 def test_send_whole_file(self):
2064 # normal send
2065 total_sent = 0
2066 offset = 0
2067 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002068 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002069 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2070 if sent == 0:
2071 break
2072 offset += sent
2073 total_sent += sent
2074 self.assertTrue(sent <= nbytes)
2075 self.assertEqual(offset, total_sent)
2076
2077 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002078 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002079 self.client.close()
2080 self.server.wait()
2081 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002082 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002083 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002084
2085 def test_send_at_certain_offset(self):
2086 # start sending a file at a certain offset
2087 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002088 offset = len(self.DATA) // 2
2089 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002090 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002091 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002092 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2093 if sent == 0:
2094 break
2095 offset += sent
2096 total_sent += sent
2097 self.assertTrue(sent <= nbytes)
2098
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002099 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002100 self.client.close()
2101 self.server.wait()
2102 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002103 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002104 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002105 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002106 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002107
2108 def test_offset_overflow(self):
2109 # specify an offset > file size
2110 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002111 try:
2112 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2113 except OSError as e:
2114 # Solaris can raise EINVAL if offset >= file length, ignore.
2115 if e.errno != errno.EINVAL:
2116 raise
2117 else:
2118 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002119 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002120 self.client.close()
2121 self.server.wait()
2122 data = self.server.handler_instance.get_data()
2123 self.assertEqual(data, b'')
2124
2125 def test_invalid_offset(self):
2126 with self.assertRaises(OSError) as cm:
2127 os.sendfile(self.sockno, self.fileno, -1, 4096)
2128 self.assertEqual(cm.exception.errno, errno.EINVAL)
2129
2130 # --- headers / trailers tests
2131
Serhiy Storchaka43767632013-11-03 21:31:38 +02002132 @requires_headers_trailers
2133 def test_headers(self):
2134 total_sent = 0
2135 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2136 headers=[b"x" * 512])
2137 total_sent += sent
2138 offset = 4096
2139 nbytes = 4096
2140 while 1:
2141 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2142 offset, nbytes)
2143 if sent == 0:
2144 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002145 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002146 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002147
Serhiy Storchaka43767632013-11-03 21:31:38 +02002148 expected_data = b"x" * 512 + self.DATA
2149 self.assertEqual(total_sent, len(expected_data))
2150 self.client.close()
2151 self.server.wait()
2152 data = self.server.handler_instance.get_data()
2153 self.assertEqual(hash(data), hash(expected_data))
2154
2155 @requires_headers_trailers
2156 def test_trailers(self):
2157 TESTFN2 = support.TESTFN + "2"
2158 file_data = b"abcdef"
2159 with open(TESTFN2, 'wb') as f:
2160 f.write(file_data)
2161 with open(TESTFN2, 'rb')as f:
2162 self.addCleanup(os.remove, TESTFN2)
2163 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2164 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002165 self.client.close()
2166 self.server.wait()
2167 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002168 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002169
Serhiy Storchaka43767632013-11-03 21:31:38 +02002170 @requires_headers_trailers
2171 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2172 'test needs os.SF_NODISKIO')
2173 def test_flags(self):
2174 try:
2175 os.sendfile(self.sockno, self.fileno, 0, 4096,
2176 flags=os.SF_NODISKIO)
2177 except OSError as err:
2178 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2179 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002180
2181
Larry Hastings9cf065c2012-06-22 16:30:09 -07002182def supports_extended_attributes():
2183 if not hasattr(os, "setxattr"):
2184 return False
2185 try:
2186 with open(support.TESTFN, "wb") as fp:
2187 try:
2188 os.setxattr(fp.fileno(), b"user.test", b"")
2189 except OSError:
2190 return False
2191 finally:
2192 support.unlink(support.TESTFN)
2193 # Kernels < 2.6.39 don't respect setxattr flags.
2194 kernel_version = platform.release()
2195 m = re.match("2.6.(\d{1,2})", kernel_version)
2196 return m is None or int(m.group(1)) >= 39
2197
2198
2199@unittest.skipUnless(supports_extended_attributes(),
2200 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002201class ExtendedAttributeTests(unittest.TestCase):
2202
2203 def tearDown(self):
2204 support.unlink(support.TESTFN)
2205
Larry Hastings9cf065c2012-06-22 16:30:09 -07002206 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002207 fn = support.TESTFN
2208 open(fn, "wb").close()
2209 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002210 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002211 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002212 init_xattr = listxattr(fn)
2213 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002214 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002215 xattr = set(init_xattr)
2216 xattr.add("user.test")
2217 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002218 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2219 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2220 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002221 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002222 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002223 self.assertEqual(cm.exception.errno, errno.EEXIST)
2224 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002225 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002226 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002227 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002228 xattr.add("user.test2")
2229 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002230 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002231 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002232 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002233 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002234 xattr.remove("user.test")
2235 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002236 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2237 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2238 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2239 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002240 many = sorted("user.test{}".format(i) for i in range(100))
2241 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002242 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002243 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002244
Larry Hastings9cf065c2012-06-22 16:30:09 -07002245 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002246 def make_bytes(s):
2247 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002248 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002249 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002250 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002251
2252 def test_simple(self):
2253 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2254 os.listxattr)
2255
2256 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002257 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2258 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002259
2260 def test_fds(self):
2261 def getxattr(path, *args):
2262 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002263 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002264 def setxattr(path, *args):
2265 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002266 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002267 def removexattr(path, *args):
2268 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002269 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002270 def listxattr(path, *args):
2271 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002272 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002273 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2274
2275
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002276@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2277class Win32DeprecatedBytesAPI(unittest.TestCase):
2278 def test_deprecated(self):
2279 import nt
2280 filename = os.fsencode(support.TESTFN)
2281 with warnings.catch_warnings():
2282 warnings.simplefilter("error", DeprecationWarning)
2283 for func, *args in (
2284 (nt._getfullpathname, filename),
2285 (nt._isdir, filename),
2286 (os.access, filename, os.R_OK),
2287 (os.chdir, filename),
2288 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002289 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002290 (os.link, filename, filename),
2291 (os.listdir, filename),
2292 (os.lstat, filename),
2293 (os.mkdir, filename),
2294 (os.open, filename, os.O_RDONLY),
2295 (os.rename, filename, filename),
2296 (os.rmdir, filename),
2297 (os.startfile, filename),
2298 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002299 (os.unlink, filename),
2300 (os.utime, filename),
2301 ):
2302 self.assertRaises(DeprecationWarning, func, *args)
2303
Victor Stinner28216442011-11-16 00:34:44 +01002304 @support.skip_unless_symlink
2305 def test_symlink(self):
2306 filename = os.fsencode(support.TESTFN)
2307 with warnings.catch_warnings():
2308 warnings.simplefilter("error", DeprecationWarning)
2309 self.assertRaises(DeprecationWarning,
2310 os.symlink, filename, filename)
2311
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002312
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002313@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2314class TermsizeTests(unittest.TestCase):
2315 def test_does_not_crash(self):
2316 """Check if get_terminal_size() returns a meaningful value.
2317
2318 There's no easy portable way to actually check the size of the
2319 terminal, so let's check if it returns something sensible instead.
2320 """
2321 try:
2322 size = os.get_terminal_size()
2323 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002324 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002325 # Under win32 a generic OSError can be thrown if the
2326 # handle cannot be retrieved
2327 self.skipTest("failed to query terminal size")
2328 raise
2329
Antoine Pitroucfade362012-02-08 23:48:59 +01002330 self.assertGreaterEqual(size.columns, 0)
2331 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002332
2333 def test_stty_match(self):
2334 """Check if stty returns the same results
2335
2336 stty actually tests stdin, so get_terminal_size is invoked on
2337 stdin explicitly. If stty succeeded, then get_terminal_size()
2338 should work too.
2339 """
2340 try:
2341 size = subprocess.check_output(['stty', 'size']).decode().split()
2342 except (FileNotFoundError, subprocess.CalledProcessError):
2343 self.skipTest("stty invocation failed")
2344 expected = (int(size[1]), int(size[0])) # reversed order
2345
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002346 try:
2347 actual = os.get_terminal_size(sys.__stdin__.fileno())
2348 except OSError as e:
2349 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2350 # Under win32 a generic OSError can be thrown if the
2351 # handle cannot be retrieved
2352 self.skipTest("failed to query terminal size")
2353 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002354 self.assertEqual(expected, actual)
2355
2356
Victor Stinner292c8352012-10-30 02:17:38 +01002357class OSErrorTests(unittest.TestCase):
2358 def setUp(self):
2359 class Str(str):
2360 pass
2361
Victor Stinnerafe17062012-10-31 22:47:43 +01002362 self.bytes_filenames = []
2363 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002364 if support.TESTFN_UNENCODABLE is not None:
2365 decoded = support.TESTFN_UNENCODABLE
2366 else:
2367 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002368 self.unicode_filenames.append(decoded)
2369 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002370 if support.TESTFN_UNDECODABLE is not None:
2371 encoded = support.TESTFN_UNDECODABLE
2372 else:
2373 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002374 self.bytes_filenames.append(encoded)
2375 self.bytes_filenames.append(memoryview(encoded))
2376
2377 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002378
2379 def test_oserror_filename(self):
2380 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002381 (self.filenames, os.chdir,),
2382 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002383 (self.filenames, os.lstat,),
2384 (self.filenames, os.open, os.O_RDONLY),
2385 (self.filenames, os.rmdir,),
2386 (self.filenames, os.stat,),
2387 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002388 ]
2389 if sys.platform == "win32":
2390 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002391 (self.bytes_filenames, os.rename, b"dst"),
2392 (self.bytes_filenames, os.replace, b"dst"),
2393 (self.unicode_filenames, os.rename, "dst"),
2394 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002395 # Issue #16414: Don't test undecodable names with listdir()
2396 # because of a Windows bug.
2397 #
2398 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2399 # empty list (instead of failing), whereas os.listdir(b'\xff')
2400 # raises a FileNotFoundError. It looks like a Windows bug:
2401 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2402 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2403 # ERROR_PATH_NOT_FOUND (3).
2404 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002405 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002406 else:
2407 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002408 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002409 (self.filenames, os.rename, "dst"),
2410 (self.filenames, os.replace, "dst"),
2411 ))
2412 if hasattr(os, "chown"):
2413 funcs.append((self.filenames, os.chown, 0, 0))
2414 if hasattr(os, "lchown"):
2415 funcs.append((self.filenames, os.lchown, 0, 0))
2416 if hasattr(os, "truncate"):
2417 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002418 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002419 funcs.append((self.filenames, os.chflags, 0))
2420 if hasattr(os, "lchflags"):
2421 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002422 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002423 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002424 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002425 if sys.platform == "win32":
2426 funcs.append((self.bytes_filenames, os.link, b"dst"))
2427 funcs.append((self.unicode_filenames, os.link, "dst"))
2428 else:
2429 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002430 if hasattr(os, "listxattr"):
2431 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002432 (self.filenames, os.listxattr,),
2433 (self.filenames, os.getxattr, "user.test"),
2434 (self.filenames, os.setxattr, "user.test", b'user'),
2435 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002436 ))
2437 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002438 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002439 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002440 if sys.platform == "win32":
2441 funcs.append((self.unicode_filenames, os.readlink,))
2442 else:
2443 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002444
Victor Stinnerafe17062012-10-31 22:47:43 +01002445 for filenames, func, *func_args in funcs:
2446 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002447 try:
2448 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002449 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002450 self.assertIs(err.filename, name)
2451 else:
2452 self.fail("No exception thrown by {}".format(func))
2453
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002454class CPUCountTests(unittest.TestCase):
2455 def test_cpu_count(self):
2456 cpus = os.cpu_count()
2457 if cpus is not None:
2458 self.assertIsInstance(cpus, int)
2459 self.assertGreater(cpus, 0)
2460 else:
2461 self.skipTest("Could not determine the number of CPUs")
2462
Victor Stinnerdaf45552013-08-28 00:53:59 +02002463
2464class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002465 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002466 fd = os.open(__file__, os.O_RDONLY)
2467 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002468 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002469
Victor Stinnerdaf45552013-08-28 00:53:59 +02002470 os.set_inheritable(fd, True)
2471 self.assertEqual(os.get_inheritable(fd), True)
2472
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002473 @unittest.skipIf(fcntl is None, "need fcntl")
2474 def test_get_inheritable_cloexec(self):
2475 fd = os.open(__file__, os.O_RDONLY)
2476 self.addCleanup(os.close, fd)
2477 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002478
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002479 # clear FD_CLOEXEC flag
2480 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2481 flags &= ~fcntl.FD_CLOEXEC
2482 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002483
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002484 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002485
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002486 @unittest.skipIf(fcntl is None, "need fcntl")
2487 def test_set_inheritable_cloexec(self):
2488 fd = os.open(__file__, os.O_RDONLY)
2489 self.addCleanup(os.close, fd)
2490 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2491 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002492
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002493 os.set_inheritable(fd, True)
2494 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2495 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002496
Victor Stinnerdaf45552013-08-28 00:53:59 +02002497 def test_open(self):
2498 fd = os.open(__file__, os.O_RDONLY)
2499 self.addCleanup(os.close, fd)
2500 self.assertEqual(os.get_inheritable(fd), False)
2501
2502 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2503 def test_pipe(self):
2504 rfd, wfd = os.pipe()
2505 self.addCleanup(os.close, rfd)
2506 self.addCleanup(os.close, wfd)
2507 self.assertEqual(os.get_inheritable(rfd), False)
2508 self.assertEqual(os.get_inheritable(wfd), False)
2509
2510 def test_dup(self):
2511 fd1 = os.open(__file__, os.O_RDONLY)
2512 self.addCleanup(os.close, fd1)
2513
2514 fd2 = os.dup(fd1)
2515 self.addCleanup(os.close, fd2)
2516 self.assertEqual(os.get_inheritable(fd2), False)
2517
2518 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2519 def test_dup2(self):
2520 fd = os.open(__file__, os.O_RDONLY)
2521 self.addCleanup(os.close, fd)
2522
2523 # inheritable by default
2524 fd2 = os.open(__file__, os.O_RDONLY)
2525 try:
2526 os.dup2(fd, fd2)
2527 self.assertEqual(os.get_inheritable(fd2), True)
2528 finally:
2529 os.close(fd2)
2530
2531 # force non-inheritable
2532 fd3 = os.open(__file__, os.O_RDONLY)
2533 try:
2534 os.dup2(fd, fd3, inheritable=False)
2535 self.assertEqual(os.get_inheritable(fd3), False)
2536 finally:
2537 os.close(fd3)
2538
2539 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2540 def test_openpty(self):
2541 master_fd, slave_fd = os.openpty()
2542 self.addCleanup(os.close, master_fd)
2543 self.addCleanup(os.close, slave_fd)
2544 self.assertEqual(os.get_inheritable(master_fd), False)
2545 self.assertEqual(os.get_inheritable(slave_fd), False)
2546
2547
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002548@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002549def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002550 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002551 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002552 StatAttributeTests,
2553 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002554 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002555 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002556 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002557 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002558 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002559 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002560 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002561 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002562 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002563 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002564 Win32KillTests,
Tim Golden781bbeb2013-10-25 20:24:06 +01002565 Win32ListdirTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002566 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002567 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002568 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002569 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002570 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002571 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002572 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002573 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002574 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002575 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002576 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002577 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002578 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002579 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002580 CPUCountTests,
Victor Stinnerdaf45552013-08-28 00:53:59 +02002581 FDInheritanceTests,
Tim Golden0321cf22014-05-05 19:46:17 +01002582 Win32JunctionTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002583 )
Fred Drake2e2be372001-09-20 21:33:42 +00002584
2585if __name__ == "__main__":
2586 test_main()