blob: a04704875a7300da3509f21763bad4a12289d04e [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070027import decimal
28import fractions
Christian Heimes25827622013-10-12 01:27:08 +020029import pickle
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020047 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020048except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020049 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020050
Georg Brandl2daf6ae2012-02-20 19:54:16 +010051from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000052
Victor Stinner034d0aa2012-06-05 01:22:15 +020053with warnings.catch_warnings():
54 warnings.simplefilter("ignore", DeprecationWarning)
55 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010056st = os.stat(__file__)
57stat_supports_subsecond = (
58 # check if float and int timestamps are different
59 (st.st_atime != st[7])
60 or (st.st_mtime != st[8])
61 or (st.st_ctime != st[9]))
62
Mark Dickinson7cf03892010-04-16 13:45:35 +000063# Detect whether we're on a Linux system that uses the (now outdated
64# and unmaintained) linuxthreads threading library. There's an issue
65# when combining linuxthreads with a failed execv call: see
66# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020067if hasattr(sys, 'thread_info') and sys.thread_info.version:
68 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
69else:
70 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000071
Stefan Krahebee49a2013-01-17 15:31:00 +010072# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
73HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
74
Thomas Wouters0e3f5912006-08-11 14:57:12 +000075# Tests creating TESTFN
76class FileTests(unittest.TestCase):
77 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000078 if os.path.exists(support.TESTFN):
79 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000080 tearDown = setUp
81
82 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000083 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000084 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000085 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000086
Christian Heimesfdab48e2008-01-20 09:06:41 +000087 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000088 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
89 # We must allocate two consecutive file descriptors, otherwise
90 # it will mess up other file descriptors (perhaps even the three
91 # standard ones).
92 second = os.dup(first)
93 try:
94 retries = 0
95 while second != first + 1:
96 os.close(first)
97 retries += 1
98 if retries > 10:
99 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000100 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000101 first, second = second, os.dup(second)
102 finally:
103 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000104 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000105 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000106 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000107
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000108 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000109 def test_rename(self):
110 path = support.TESTFN
111 old = sys.getrefcount(path)
112 self.assertRaises(TypeError, os.rename, path, 0)
113 new = sys.getrefcount(path)
114 self.assertEqual(old, new)
115
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000116 def test_read(self):
117 with open(support.TESTFN, "w+b") as fobj:
118 fobj.write(b"spam")
119 fobj.flush()
120 fd = fobj.fileno()
121 os.lseek(fd, 0, 0)
122 s = os.read(fd, 4)
123 self.assertEqual(type(s), bytes)
124 self.assertEqual(s, b"spam")
125
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200126 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200127 # Skip the test on 32-bit platforms: the number of bytes must fit in a
128 # Py_ssize_t type
129 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
130 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200131 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
132 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200133 with open(support.TESTFN, "wb") as fp:
134 fp.write(b'test')
135 self.addCleanup(support.unlink, support.TESTFN)
136
137 # Issue #21932: Make sure that os.read() does not raise an
138 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200139 with open(support.TESTFN, "rb") as fp:
140 data = os.read(fp.fileno(), size)
141
142 # The test does not try to read more than 2 GB at once because the
143 # operating system is free to return less bytes than requested.
144 self.assertEqual(data, b'test')
145
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000146 def test_write(self):
147 # os.write() accepts bytes- and buffer-like objects but not strings
148 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
149 self.assertRaises(TypeError, os.write, fd, "beans")
150 os.write(fd, b"bacon\n")
151 os.write(fd, bytearray(b"eggs\n"))
152 os.write(fd, memoryview(b"spam\n"))
153 os.close(fd)
154 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000155 self.assertEqual(fobj.read().splitlines(),
156 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000157
Victor Stinnere0daff12011-03-20 23:36:35 +0100158 def write_windows_console(self, *args):
159 retcode = subprocess.call(args,
160 # use a new console to not flood the test output
161 creationflags=subprocess.CREATE_NEW_CONSOLE,
162 # use a shell to hide the console window (SW_HIDE)
163 shell=True)
164 self.assertEqual(retcode, 0)
165
166 @unittest.skipUnless(sys.platform == 'win32',
167 'test specific to the Windows console')
168 def test_write_windows_console(self):
169 # Issue #11395: the Windows console returns an error (12: not enough
170 # space error) on writing into stdout if stdout mode is binary and the
171 # length is greater than 66,000 bytes (or less, depending on heap
172 # usage).
173 code = "print('x' * 100000)"
174 self.write_windows_console(sys.executable, "-c", code)
175 self.write_windows_console(sys.executable, "-u", "-c", code)
176
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000177 def fdopen_helper(self, *args):
178 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200179 f = os.fdopen(fd, *args)
180 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000181
182 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200183 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
184 os.close(fd)
185
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000186 self.fdopen_helper()
187 self.fdopen_helper('r')
188 self.fdopen_helper('r', 100)
189
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100190 def test_replace(self):
191 TESTFN2 = support.TESTFN + ".2"
192 with open(support.TESTFN, 'w') as f:
193 f.write("1")
194 with open(TESTFN2, 'w') as f:
195 f.write("2")
196 self.addCleanup(os.unlink, TESTFN2)
197 os.replace(support.TESTFN, TESTFN2)
198 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
199 with open(TESTFN2, 'r') as f:
200 self.assertEqual(f.read(), "1")
201
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200202
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203# Test attributes on return values from os.*stat* family.
204class StatAttributeTests(unittest.TestCase):
205 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000206 os.mkdir(support.TESTFN)
207 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000208 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000209 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000210 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000211
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000212 def tearDown(self):
213 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000214 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000215
Serhiy Storchaka43767632013-11-03 21:31:38 +0200216 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000217 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000218 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000219
220 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000221 self.assertEqual(result[stat.ST_SIZE], 3)
222 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000223
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000224 # Make sure all the attributes are there
225 members = dir(result)
226 for name in dir(stat):
227 if name[:3] == 'ST_':
228 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000229 if name.endswith("TIME"):
230 def trunc(x): return int(x)
231 else:
232 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000233 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000235 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000236
Larry Hastings6fe20b32012-04-19 15:07:49 -0700237 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700238 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700239 for name in 'st_atime st_mtime st_ctime'.split():
240 floaty = int(getattr(result, name) * 100000)
241 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700242 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700243
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244 try:
245 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200246 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247 except IndexError:
248 pass
249
250 # Make sure that assignment fails
251 try:
252 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200253 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000254 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000255 pass
256
257 try:
258 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200259 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000260 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261 pass
262
263 try:
264 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200265 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000266 except AttributeError:
267 pass
268
269 # Use the stat_result constructor with a too-short tuple.
270 try:
271 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200272 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273 except TypeError:
274 pass
275
Ezio Melotti42da6632011-03-15 05:18:48 +0200276 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000277 try:
278 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
279 except TypeError:
280 pass
281
Antoine Pitrou38425292010-09-21 18:19:07 +0000282 def test_stat_attributes(self):
283 self.check_stat_attributes(self.fname)
284
285 def test_stat_attributes_bytes(self):
286 try:
287 fname = self.fname.encode(sys.getfilesystemencoding())
288 except UnicodeEncodeError:
289 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100290 with warnings.catch_warnings():
291 warnings.simplefilter("ignore", DeprecationWarning)
292 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000293
Christian Heimes25827622013-10-12 01:27:08 +0200294 def test_stat_result_pickle(self):
295 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200296 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
297 p = pickle.dumps(result, proto)
298 self.assertIn(b'stat_result', p)
299 if proto < 4:
300 self.assertIn(b'cos\nstat_result\n', p)
301 unpickled = pickle.loads(p)
302 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200303
Serhiy Storchaka43767632013-11-03 21:31:38 +0200304 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000305 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000306 try:
307 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000308 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000309 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000310 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200311 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000312
313 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000314 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000315
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000316 # Make sure all the attributes are there.
317 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
318 'ffree', 'favail', 'flag', 'namemax')
319 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000320 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000321
322 # Make sure that assignment really fails
323 try:
324 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200325 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000326 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000327 pass
328
329 try:
330 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200331 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000332 except AttributeError:
333 pass
334
335 # Use the constructor with a too-short tuple.
336 try:
337 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200338 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339 except TypeError:
340 pass
341
Ezio Melotti42da6632011-03-15 05:18:48 +0200342 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000343 try:
344 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
345 except TypeError:
346 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000347
Christian Heimes25827622013-10-12 01:27:08 +0200348 @unittest.skipUnless(hasattr(os, 'statvfs'),
349 "need os.statvfs()")
350 def test_statvfs_result_pickle(self):
351 try:
352 result = os.statvfs(self.fname)
353 except OSError as e:
354 # On AtheOS, glibc always returns ENOSYS
355 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200356 self.skipTest('os.statvfs() failed with ENOSYS')
357
Serhiy Storchakabad12572014-12-15 14:03:42 +0200358 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
359 p = pickle.dumps(result, proto)
360 self.assertIn(b'statvfs_result', p)
361 if proto < 4:
362 self.assertIn(b'cos\nstatvfs_result\n', p)
363 unpickled = pickle.loads(p)
364 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200365
Thomas Wouters89f507f2006-12-13 04:49:30 +0000366 def test_utime_dir(self):
367 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000368 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000369 # round to int, because some systems may support sub-second
370 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000371 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
372 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000373 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000374
Larry Hastings76ad59b2012-05-03 00:30:07 -0700375 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600376 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600377 # second argument. Check that the previous methods of passing
378 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700379 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600380 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700381 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
382 # Setting the time to the time you just read, then reading again,
383 # should always return exactly the same times.
384 st1 = os.stat(filename)
385 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
386 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600387 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700388 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600389 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700390 # Set to the current time in the new way
391 os.utime(filename)
392 st3 = os.stat(filename)
393 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
394
395 def test_utime(self):
396 def utime(file, times):
397 return os.utime(file, times)
398 self._test_utime(self.fname, getattr, utime, 10)
399 self._test_utime(support.TESTFN, getattr, utime, 10)
400
401
402 def _test_utime_ns(self, set_times_ns, test_dir=True):
403 def getattr_ns(o, attr):
404 return getattr(o, attr + "_ns")
405 ten_s = 10 * 1000 * 1000 * 1000
406 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
407 if test_dir:
408 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
409
410 def test_utime_ns(self):
411 def utime_ns(file, times):
412 return os.utime(file, ns=times)
413 self._test_utime_ns(utime_ns)
414
Larry Hastings9cf065c2012-06-22 16:30:09 -0700415 requires_utime_dir_fd = unittest.skipUnless(
416 os.utime in os.supports_dir_fd,
417 "dir_fd support for utime required for this test.")
418 requires_utime_fd = unittest.skipUnless(
419 os.utime in os.supports_fd,
420 "fd support for utime required for this test.")
421 requires_utime_nofollow_symlinks = unittest.skipUnless(
422 os.utime in os.supports_follow_symlinks,
423 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700424
Larry Hastings9cf065c2012-06-22 16:30:09 -0700425 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700426 def test_lutimes_ns(self):
427 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700428 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700429 self._test_utime_ns(lutimes_ns)
430
Larry Hastings9cf065c2012-06-22 16:30:09 -0700431 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700432 def test_futimes_ns(self):
433 def futimes_ns(file, times):
434 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700435 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700436 self._test_utime_ns(futimes_ns, test_dir=False)
437
438 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700439 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700440 getattr(os, name)(arg, (5, 5), ns=(5, 5))
441
442 def test_utime_invalid_arguments(self):
443 self._utime_invalid_arguments('utime', self.fname)
444
Brian Curtin52fbea12011-11-06 13:41:17 -0600445
Victor Stinner1aa54a42012-02-08 04:09:37 +0100446 @unittest.skipUnless(stat_supports_subsecond,
447 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100448 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100449 asec, amsec = 1, 901
450 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100451 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100452 mtime = msec + mmsec * 1e-3
453 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100454 os.utime(filename, (0, 0))
455 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200456 with warnings.catch_warnings():
457 warnings.simplefilter("ignore", DeprecationWarning)
458 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100459 st = os.stat(filename)
460 self.assertAlmostEqual(st.st_atime, atime, places=3)
461 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100462
Victor Stinnera2f7c002012-02-08 03:36:25 +0100463 def test_utime_subsecond(self):
464 def set_time(filename, atime, mtime):
465 os.utime(filename, (atime, mtime))
466 self._test_utime_subsecond(set_time)
467
Larry Hastings9cf065c2012-06-22 16:30:09 -0700468 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100469 def test_futimes_subsecond(self):
470 def set_time(filename, atime, mtime):
471 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700472 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100473 self._test_utime_subsecond(set_time)
474
Larry Hastings9cf065c2012-06-22 16:30:09 -0700475 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100476 def test_futimens_subsecond(self):
477 def set_time(filename, atime, mtime):
478 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700479 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100480 self._test_utime_subsecond(set_time)
481
Larry Hastings9cf065c2012-06-22 16:30:09 -0700482 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100483 def test_futimesat_subsecond(self):
484 def set_time(filename, atime, mtime):
485 dirname = os.path.dirname(filename)
486 dirfd = os.open(dirname, os.O_RDONLY)
487 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700488 os.utime(os.path.basename(filename), dir_fd=dirfd,
489 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100490 finally:
491 os.close(dirfd)
492 self._test_utime_subsecond(set_time)
493
Larry Hastings9cf065c2012-06-22 16:30:09 -0700494 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100495 def test_lutimes_subsecond(self):
496 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700497 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100498 self._test_utime_subsecond(set_time)
499
Larry Hastings9cf065c2012-06-22 16:30:09 -0700500 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100501 def test_utimensat_subsecond(self):
502 def set_time(filename, atime, mtime):
503 dirname = os.path.dirname(filename)
504 dirfd = os.open(dirname, os.O_RDONLY)
505 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700506 os.utime(os.path.basename(filename), dir_fd=dirfd,
507 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100508 finally:
509 os.close(dirfd)
510 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100511
Serhiy Storchaka43767632013-11-03 21:31:38 +0200512 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000513 # systems support centiseconds
Serhiy Storchaka43767632013-11-03 21:31:38 +0200514 def get_file_system(path):
515 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000516 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000517 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000518 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000519 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000520 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000521 return buf.value
522
Serhiy Storchaka43767632013-11-03 21:31:38 +0200523 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
524 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
525 "requires NTFS")
526 def test_1565150(self):
527 t1 = 1159195039.25
528 os.utime(self.fname, (t1, t1))
529 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000530
Serhiy Storchaka43767632013-11-03 21:31:38 +0200531 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
532 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
533 "requires NTFS")
534 def test_large_time(self):
535 t1 = 5000000000 # some day in 2128
536 os.utime(self.fname, (t1, t1))
537 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000538
Serhiy Storchaka43767632013-11-03 21:31:38 +0200539 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
540 def test_1686475(self):
541 # Verify that an open file can be stat'ed
542 try:
543 os.stat(r"c:\pagefile.sys")
544 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600545 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200546 except OSError as e:
547 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000548
Serhiy Storchaka43767632013-11-03 21:31:38 +0200549 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
550 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
551 def test_15261(self):
552 # Verify that stat'ing a closed fd does not cause crash
553 r, w = os.pipe()
554 try:
555 os.stat(r) # should not raise error
556 finally:
557 os.close(r)
558 os.close(w)
559 with self.assertRaises(OSError) as ctx:
560 os.stat(r)
561 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100562
Zachary Ware63f277b2014-06-19 09:46:37 -0500563 def check_file_attributes(self, result):
564 self.assertTrue(hasattr(result, 'st_file_attributes'))
565 self.assertTrue(isinstance(result.st_file_attributes, int))
566 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
567
568 @unittest.skipUnless(sys.platform == "win32",
569 "st_file_attributes is Win32 specific")
570 def test_file_attributes(self):
571 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
572 result = os.stat(self.fname)
573 self.check_file_attributes(result)
574 self.assertEqual(
575 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
576 0)
577
578 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
579 result = os.stat(support.TESTFN)
580 self.check_file_attributes(result)
581 self.assertEqual(
582 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
583 stat.FILE_ATTRIBUTE_DIRECTORY)
584
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000585from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000586
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000587class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000588 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000589 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000590
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000591 def setUp(self):
592 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000593 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000594 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000595 for key, value in self._reference().items():
596 os.environ[key] = value
597
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000598 def tearDown(self):
599 os.environ.clear()
600 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000601 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000602 os.environb.clear()
603 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000604
Christian Heimes90333392007-11-01 19:08:42 +0000605 def _reference(self):
606 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
607
608 def _empty_mapping(self):
609 os.environ.clear()
610 return os.environ
611
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000612 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300613 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000614 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000615 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300616 os.environ.update(HELLO="World")
617 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
618 value = popen.read().strip()
619 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000620
Ezio Melottic7e139b2012-09-26 20:01:34 +0300621 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000622 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300623 with os.popen(
624 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
625 it = iter(popen)
626 self.assertEqual(next(it), "line1\n")
627 self.assertEqual(next(it), "line2\n")
628 self.assertEqual(next(it), "line3\n")
629 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000630
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000631 # Verify environ keys and values from the OS are of the
632 # correct str type.
633 def test_keyvalue_types(self):
634 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000635 self.assertEqual(type(key), str)
636 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000637
Christian Heimes90333392007-11-01 19:08:42 +0000638 def test_items(self):
639 for key, value in self._reference().items():
640 self.assertEqual(os.environ.get(key), value)
641
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000642 # Issue 7310
643 def test___repr__(self):
644 """Check that the repr() of os.environ looks like environ({...})."""
645 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000646 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
647 '{!r}: {!r}'.format(key, value)
648 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000649
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000650 def test_get_exec_path(self):
651 defpath_list = os.defpath.split(os.pathsep)
652 test_path = ['/monty', '/python', '', '/flying/circus']
653 test_env = {'PATH': os.pathsep.join(test_path)}
654
655 saved_environ = os.environ
656 try:
657 os.environ = dict(test_env)
658 # Test that defaulting to os.environ works.
659 self.assertSequenceEqual(test_path, os.get_exec_path())
660 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
661 finally:
662 os.environ = saved_environ
663
664 # No PATH environment variable
665 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
666 # Empty PATH environment variable
667 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
668 # Supplied PATH environment variable
669 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
670
Victor Stinnerb745a742010-05-18 17:17:23 +0000671 if os.supports_bytes_environ:
672 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000673 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000674 # ignore BytesWarning warning
675 with warnings.catch_warnings(record=True):
676 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000677 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000678 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000679 pass
680 else:
681 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000682
683 # bytes key and/or value
684 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
685 ['abc'])
686 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
687 ['abc'])
688 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
689 ['abc'])
690
691 @unittest.skipUnless(os.supports_bytes_environ,
692 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000693 def test_environb(self):
694 # os.environ -> os.environb
695 value = 'euro\u20ac'
696 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000697 value_bytes = value.encode(sys.getfilesystemencoding(),
698 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000699 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000700 msg = "U+20AC character is not encodable to %s" % (
701 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000702 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000703 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000704 self.assertEqual(os.environ['unicode'], value)
705 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000706
707 # os.environb -> os.environ
708 value = b'\xff'
709 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000710 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000711 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000712 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000713
Charles-François Natali2966f102011-11-26 11:32:46 +0100714 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
715 # #13415).
716 @support.requires_freebsd_version(7)
717 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100718 def test_unset_error(self):
719 if sys.platform == "win32":
720 # an environment variable is limited to 32,767 characters
721 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100722 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100723 else:
724 # "=" is not allowed in a variable name
725 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100726 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100727
Victor Stinner6d101392013-04-14 16:35:04 +0200728 def test_key_type(self):
729 missing = 'missingkey'
730 self.assertNotIn(missing, os.environ)
731
Victor Stinner839e5ea2013-04-14 16:43:03 +0200732 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200733 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200734 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200735 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200736
Victor Stinner839e5ea2013-04-14 16:43:03 +0200737 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200738 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200739 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200740 self.assertTrue(cm.exception.__suppress_context__)
741
Victor Stinner6d101392013-04-14 16:35:04 +0200742
Tim Petersc4e09402003-04-25 07:11:48 +0000743class WalkTests(unittest.TestCase):
744 """Tests for os.walk()."""
745
Charles-François Natali7372b062012-02-05 15:15:38 +0100746 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000747 import os
748 from os.path import join
749
750 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000751 # TESTFN/
752 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000753 # tmp1
754 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000755 # tmp2
756 # SUB11/ no kids
757 # SUB2/ a file kid and a dirsymlink kid
758 # tmp3
759 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200760 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000761 # TEST2/
762 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000763 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000764 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000765 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000766 sub2_path = join(walk_path, "SUB2")
767 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000768 tmp2_path = join(sub1_path, "tmp2")
769 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000770 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000771 t2_path = join(support.TESTFN, "TEST2")
772 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200773 link_path = join(sub2_path, "link")
774 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000775
776 # Create stuff.
777 os.makedirs(sub11_path)
778 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000779 os.makedirs(t2_path)
780 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000781 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000782 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
783 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000784 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400785 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400786 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200787 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000788 else:
789 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000790
791 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000792 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000793 self.assertEqual(len(all), 4)
794 # We can't know which order SUB1 and SUB2 will appear in.
795 # Not flipped: TESTFN, SUB1, SUB11, SUB2
796 # flipped: TESTFN, SUB2, SUB1, SUB11
797 flipped = all[0][1][0] != "SUB1"
798 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200799 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000800 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000801 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
802 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000803 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000804
805 # Prune the search.
806 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000807 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000808 all.append((root, dirs, files))
809 # Don't descend into SUB1.
810 if 'SUB1' in dirs:
811 # Note that this also mutates the dirs we appended to all!
812 dirs.remove('SUB1')
813 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000814 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200815 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000816 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000817
818 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000819 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000820 self.assertEqual(len(all), 4)
821 # We can't know which order SUB1 and SUB2 will appear in.
822 # Not flipped: SUB11, SUB1, SUB2, TESTFN
823 # flipped: SUB2, SUB11, SUB1, TESTFN
824 flipped = all[3][1][0] != "SUB1"
825 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200826 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000827 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000828 self.assertEqual(all[flipped], (sub11_path, [], []))
829 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000830 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000831
Brian Curtin3b4499c2010-12-28 14:31:47 +0000832 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000833 # Walk, following symlinks.
834 for root, dirs, files in os.walk(walk_path, followlinks=True):
835 if root == link_path:
836 self.assertEqual(dirs, [])
837 self.assertEqual(files, ["tmp4"])
838 break
839 else:
840 self.fail("Didn't follow symlink with followlinks=True")
841
842 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000843 # Tear everything down. This is a decent use for bottom-up on
844 # Windows, which doesn't have a recursive delete command. The
845 # (not so) subtlety is that rmdir will fail unless the dir's
846 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000847 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000848 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000849 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000850 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000851 dirname = os.path.join(root, name)
852 if not os.path.islink(dirname):
853 os.rmdir(dirname)
854 else:
855 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000856 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000857
Charles-François Natali7372b062012-02-05 15:15:38 +0100858
859@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
860class FwalkTests(WalkTests):
861 """Tests for os.fwalk()."""
862
Larry Hastingsc48fe982012-06-25 04:49:05 -0700863 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
864 """
865 compare with walk() results.
866 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700867 walk_kwargs = walk_kwargs.copy()
868 fwalk_kwargs = fwalk_kwargs.copy()
869 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
870 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
871 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700872
Charles-François Natali7372b062012-02-05 15:15:38 +0100873 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700874 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100875 expected[root] = (set(dirs), set(files))
876
Larry Hastingsc48fe982012-06-25 04:49:05 -0700877 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100878 self.assertIn(root, expected)
879 self.assertEqual(expected[root], (set(dirs), set(files)))
880
Larry Hastingsc48fe982012-06-25 04:49:05 -0700881 def test_compare_to_walk(self):
882 kwargs = {'top': support.TESTFN}
883 self._compare_to_walk(kwargs, kwargs)
884
Charles-François Natali7372b062012-02-05 15:15:38 +0100885 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700886 try:
887 fd = os.open(".", os.O_RDONLY)
888 walk_kwargs = {'top': support.TESTFN}
889 fwalk_kwargs = walk_kwargs.copy()
890 fwalk_kwargs['dir_fd'] = fd
891 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
892 finally:
893 os.close(fd)
894
895 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100896 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700897 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
898 args = support.TESTFN, topdown, None
899 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100900 # check that the FD is valid
901 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700902 # redundant check
903 os.stat(rootfd)
904 # check that listdir() returns consistent information
905 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100906
907 def test_fd_leak(self):
908 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
909 # we both check that calling fwalk() a large number of times doesn't
910 # yield EMFILE, and that the minimum allocated FD hasn't changed.
911 minfd = os.dup(1)
912 os.close(minfd)
913 for i in range(256):
914 for x in os.fwalk(support.TESTFN):
915 pass
916 newfd = os.dup(1)
917 self.addCleanup(os.close, newfd)
918 self.assertEqual(newfd, minfd)
919
920 def tearDown(self):
921 # cleanup
922 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
923 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700924 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100925 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700926 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700927 if stat.S_ISDIR(st.st_mode):
928 os.rmdir(name, dir_fd=rootfd)
929 else:
930 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100931 os.rmdir(support.TESTFN)
932
933
Guido van Rossume7ba4952007-06-06 23:52:48 +0000934class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000935 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000936 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000937
938 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000939 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000940 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
941 os.makedirs(path) # Should work
942 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
943 os.makedirs(path)
944
945 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000946 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000947 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
948 os.makedirs(path)
949 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
950 'dir5', 'dir6')
951 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000952
Terry Reedy5a22b652010-12-02 07:05:56 +0000953 def test_exist_ok_existing_directory(self):
954 path = os.path.join(support.TESTFN, 'dir1')
955 mode = 0o777
956 old_mask = os.umask(0o022)
957 os.makedirs(path, mode)
958 self.assertRaises(OSError, os.makedirs, path, mode)
959 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -0400960 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +0000961 os.makedirs(path, mode=mode, exist_ok=True)
962 os.umask(old_mask)
963
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400964 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700965 def test_chown_uid_gid_arguments_must_be_index(self):
966 stat = os.stat(support.TESTFN)
967 uid = stat.st_uid
968 gid = stat.st_gid
969 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
970 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
971 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
972 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
973 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
974
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700975 def test_exist_ok_s_isgid_directory(self):
976 path = os.path.join(support.TESTFN, 'dir1')
977 S_ISGID = stat.S_ISGID
978 mode = 0o777
979 old_mask = os.umask(0o022)
980 try:
981 existing_testfn_mode = stat.S_IMODE(
982 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700983 try:
984 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700985 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700986 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700987 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
988 raise unittest.SkipTest('No support for S_ISGID dir mode.')
989 # The os should apply S_ISGID from the parent dir for us, but
990 # this test need not depend on that behavior. Be explicit.
991 os.makedirs(path, mode | S_ISGID)
992 # http://bugs.python.org/issue14992
993 # Should not fail when the bit is already set.
994 os.makedirs(path, mode, exist_ok=True)
995 # remove the bit.
996 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -0400997 # May work even when the bit is not already set when demanded.
998 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700999 finally:
1000 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001001
1002 def test_exist_ok_existing_regular_file(self):
1003 base = support.TESTFN
1004 path = os.path.join(support.TESTFN, 'dir1')
1005 f = open(path, 'w')
1006 f.write('abc')
1007 f.close()
1008 self.assertRaises(OSError, os.makedirs, path)
1009 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1010 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1011 os.remove(path)
1012
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001013 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001014 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001015 'dir4', 'dir5', 'dir6')
1016 # If the tests failed, the bottom-most directory ('../dir6')
1017 # may not have been created, so we look for the outermost directory
1018 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001019 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001020 path = os.path.dirname(path)
1021
1022 os.removedirs(path)
1023
Andrew Svetlov405faed2012-12-25 12:18:09 +02001024
1025class RemoveDirsTests(unittest.TestCase):
1026 def setUp(self):
1027 os.makedirs(support.TESTFN)
1028
1029 def tearDown(self):
1030 support.rmtree(support.TESTFN)
1031
1032 def test_remove_all(self):
1033 dira = os.path.join(support.TESTFN, 'dira')
1034 os.mkdir(dira)
1035 dirb = os.path.join(dira, 'dirb')
1036 os.mkdir(dirb)
1037 os.removedirs(dirb)
1038 self.assertFalse(os.path.exists(dirb))
1039 self.assertFalse(os.path.exists(dira))
1040 self.assertFalse(os.path.exists(support.TESTFN))
1041
1042 def test_remove_partial(self):
1043 dira = os.path.join(support.TESTFN, 'dira')
1044 os.mkdir(dira)
1045 dirb = os.path.join(dira, 'dirb')
1046 os.mkdir(dirb)
1047 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1048 f.write('text')
1049 os.removedirs(dirb)
1050 self.assertFalse(os.path.exists(dirb))
1051 self.assertTrue(os.path.exists(dira))
1052 self.assertTrue(os.path.exists(support.TESTFN))
1053
1054 def test_remove_nothing(self):
1055 dira = os.path.join(support.TESTFN, 'dira')
1056 os.mkdir(dira)
1057 dirb = os.path.join(dira, 'dirb')
1058 os.mkdir(dirb)
1059 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1060 f.write('text')
1061 with self.assertRaises(OSError):
1062 os.removedirs(dirb)
1063 self.assertTrue(os.path.exists(dirb))
1064 self.assertTrue(os.path.exists(dira))
1065 self.assertTrue(os.path.exists(support.TESTFN))
1066
1067
Guido van Rossume7ba4952007-06-06 23:52:48 +00001068class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001069 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001070 with open(os.devnull, 'wb') as f:
1071 f.write(b'hello')
1072 f.close()
1073 with open(os.devnull, 'rb') as f:
1074 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001075
Andrew Svetlov405faed2012-12-25 12:18:09 +02001076
Guido van Rossume7ba4952007-06-06 23:52:48 +00001077class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001078 def test_urandom_length(self):
1079 self.assertEqual(len(os.urandom(0)), 0)
1080 self.assertEqual(len(os.urandom(1)), 1)
1081 self.assertEqual(len(os.urandom(10)), 10)
1082 self.assertEqual(len(os.urandom(100)), 100)
1083 self.assertEqual(len(os.urandom(1000)), 1000)
1084
1085 def test_urandom_value(self):
1086 data1 = os.urandom(16)
1087 data2 = os.urandom(16)
1088 self.assertNotEqual(data1, data2)
1089
1090 def get_urandom_subprocess(self, count):
1091 code = '\n'.join((
1092 'import os, sys',
1093 'data = os.urandom(%s)' % count,
1094 'sys.stdout.buffer.write(data)',
1095 'sys.stdout.buffer.flush()'))
1096 out = assert_python_ok('-c', code)
1097 stdout = out[1]
1098 self.assertEqual(len(stdout), 16)
1099 return stdout
1100
1101 def test_urandom_subprocess(self):
1102 data1 = self.get_urandom_subprocess(16)
1103 data2 = self.get_urandom_subprocess(16)
1104 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001105
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001106 @unittest.skipUnless(resource, "test requires the resource module")
1107 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001108 # Check urandom() failing when it is not able to open /dev/random.
1109 # We spawn a new process to make the test more robust (if getrlimit()
1110 # failed to restore the file descriptor limit after this, the whole
1111 # test suite would crash; this actually happened on the OS X Tiger
1112 # buildbot).
1113 code = """if 1:
1114 import errno
1115 import os
1116 import resource
1117
1118 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1119 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1120 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001121 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001122 except OSError as e:
1123 assert e.errno == errno.EMFILE, e.errno
1124 else:
1125 raise AssertionError("OSError not raised")
1126 """
1127 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001128
Antoine Pitroue472aea2014-04-26 14:33:03 +02001129 def test_urandom_fd_closed(self):
1130 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1131 # closed.
1132 code = """if 1:
1133 import os
1134 import sys
1135 os.urandom(4)
1136 os.closerange(3, 256)
1137 sys.stdout.buffer.write(os.urandom(4))
1138 """
1139 rc, out, err = assert_python_ok('-Sc', code)
1140
1141 def test_urandom_fd_reopened(self):
1142 # Issue #21207: urandom() should detect its fd to /dev/urandom
1143 # changed to something else, and reopen it.
1144 with open(support.TESTFN, 'wb') as f:
1145 f.write(b"x" * 256)
1146 self.addCleanup(os.unlink, support.TESTFN)
1147 code = """if 1:
1148 import os
1149 import sys
1150 os.urandom(4)
1151 for fd in range(3, 256):
1152 try:
1153 os.close(fd)
1154 except OSError:
1155 pass
1156 else:
1157 # Found the urandom fd (XXX hopefully)
1158 break
1159 os.closerange(3, 256)
1160 with open({TESTFN!r}, 'rb') as f:
1161 os.dup2(f.fileno(), fd)
1162 sys.stdout.buffer.write(os.urandom(4))
1163 sys.stdout.buffer.write(os.urandom(4))
1164 """.format(TESTFN=support.TESTFN)
1165 rc, out, err = assert_python_ok('-Sc', code)
1166 self.assertEqual(len(out), 8)
1167 self.assertNotEqual(out[0:4], out[4:8])
1168 rc, out2, err2 = assert_python_ok('-Sc', code)
1169 self.assertEqual(len(out2), 8)
1170 self.assertNotEqual(out2, out)
1171
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001172
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001173@contextlib.contextmanager
1174def _execvpe_mockup(defpath=None):
1175 """
1176 Stubs out execv and execve functions when used as context manager.
1177 Records exec calls. The mock execv and execve functions always raise an
1178 exception as they would normally never return.
1179 """
1180 # A list of tuples containing (function name, first arg, args)
1181 # of calls to execv or execve that have been made.
1182 calls = []
1183
1184 def mock_execv(name, *args):
1185 calls.append(('execv', name, args))
1186 raise RuntimeError("execv called")
1187
1188 def mock_execve(name, *args):
1189 calls.append(('execve', name, args))
1190 raise OSError(errno.ENOTDIR, "execve called")
1191
1192 try:
1193 orig_execv = os.execv
1194 orig_execve = os.execve
1195 orig_defpath = os.defpath
1196 os.execv = mock_execv
1197 os.execve = mock_execve
1198 if defpath is not None:
1199 os.defpath = defpath
1200 yield calls
1201 finally:
1202 os.execv = orig_execv
1203 os.execve = orig_execve
1204 os.defpath = orig_defpath
1205
Guido van Rossume7ba4952007-06-06 23:52:48 +00001206class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001207 @unittest.skipIf(USING_LINUXTHREADS,
1208 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001209 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001210 self.assertRaises(OSError, os.execvpe, 'no such app-',
1211 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001212
Thomas Heller6790d602007-08-30 17:15:14 +00001213 def test_execvpe_with_bad_arglist(self):
1214 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1215
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001216 @unittest.skipUnless(hasattr(os, '_execvpe'),
1217 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001218 def _test_internal_execvpe(self, test_type):
1219 program_path = os.sep + 'absolutepath'
1220 if test_type is bytes:
1221 program = b'executable'
1222 fullpath = os.path.join(os.fsencode(program_path), program)
1223 native_fullpath = fullpath
1224 arguments = [b'progname', 'arg1', 'arg2']
1225 else:
1226 program = 'executable'
1227 arguments = ['progname', 'arg1', 'arg2']
1228 fullpath = os.path.join(program_path, program)
1229 if os.name != "nt":
1230 native_fullpath = os.fsencode(fullpath)
1231 else:
1232 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001233 env = {'spam': 'beans'}
1234
Victor Stinnerb745a742010-05-18 17:17:23 +00001235 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001236 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001237 self.assertRaises(RuntimeError,
1238 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001239 self.assertEqual(len(calls), 1)
1240 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1241
Victor Stinnerb745a742010-05-18 17:17:23 +00001242 # test os._execvpe() with a relative path:
1243 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001244 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001245 self.assertRaises(OSError,
1246 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001247 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001248 self.assertSequenceEqual(calls[0],
1249 ('execve', native_fullpath, (arguments, env)))
1250
1251 # test os._execvpe() with a relative path:
1252 # os.get_exec_path() reads the 'PATH' variable
1253 with _execvpe_mockup() as calls:
1254 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001255 if test_type is bytes:
1256 env_path[b'PATH'] = program_path
1257 else:
1258 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001259 self.assertRaises(OSError,
1260 os._execvpe, program, arguments, env=env_path)
1261 self.assertEqual(len(calls), 1)
1262 self.assertSequenceEqual(calls[0],
1263 ('execve', native_fullpath, (arguments, env_path)))
1264
1265 def test_internal_execvpe_str(self):
1266 self._test_internal_execvpe(str)
1267 if os.name != "nt":
1268 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001269
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001270
Serhiy Storchaka43767632013-11-03 21:31:38 +02001271@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001272class Win32ErrorTests(unittest.TestCase):
1273 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001274 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001275
1276 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001277 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001278
1279 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001280 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001281
1282 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001283 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001284 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001285 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001286 finally:
1287 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001288 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001289
1290 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001291 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001292
Thomas Wouters477c8d52006-05-27 19:21:47 +00001293 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001294 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001295
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001296class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001297 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001298 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1299 #singles.append("close")
1300 #We omit close because it doesn'r raise an exception on some platforms
1301 def get_single(f):
1302 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001303 if hasattr(os, f):
1304 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001305 return helper
1306 for f in singles:
1307 locals()["test_"+f] = get_single(f)
1308
Benjamin Peterson7522c742009-01-19 21:00:09 +00001309 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001310 try:
1311 f(support.make_bad_fd(), *args)
1312 except OSError as e:
1313 self.assertEqual(e.errno, errno.EBADF)
1314 else:
1315 self.fail("%r didn't raise a OSError with a bad file descriptor"
1316 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001317
Serhiy Storchaka43767632013-11-03 21:31:38 +02001318 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001319 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001320 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001321
Serhiy Storchaka43767632013-11-03 21:31:38 +02001322 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001323 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001324 fd = support.make_bad_fd()
1325 # Make sure none of the descriptors we are about to close are
1326 # currently valid (issue 6542).
1327 for i in range(10):
1328 try: os.fstat(fd+i)
1329 except OSError:
1330 pass
1331 else:
1332 break
1333 if i < 2:
1334 raise unittest.SkipTest(
1335 "Unable to acquire a range of invalid file descriptors")
1336 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001337
Serhiy Storchaka43767632013-11-03 21:31:38 +02001338 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001339 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001340 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001341
Serhiy Storchaka43767632013-11-03 21:31:38 +02001342 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001343 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001344 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001345
Serhiy Storchaka43767632013-11-03 21:31:38 +02001346 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001347 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001348 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001349
Serhiy Storchaka43767632013-11-03 21:31:38 +02001350 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001351 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001352 self.check(os.pathconf, "PC_NAME_MAX")
1353 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001354
Serhiy Storchaka43767632013-11-03 21:31:38 +02001355 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001356 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001357 self.check(os.truncate, 0)
1358 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001359
Serhiy Storchaka43767632013-11-03 21:31:38 +02001360 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001361 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001362 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001363
Serhiy Storchaka43767632013-11-03 21:31:38 +02001364 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001365 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001366 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001367
Victor Stinner57ddf782014-01-08 15:21:28 +01001368 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1369 def test_readv(self):
1370 buf = bytearray(10)
1371 self.check(os.readv, [buf])
1372
Serhiy Storchaka43767632013-11-03 21:31:38 +02001373 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001374 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001375 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001376
Serhiy Storchaka43767632013-11-03 21:31:38 +02001377 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001378 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001379 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001380
Victor Stinner57ddf782014-01-08 15:21:28 +01001381 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1382 def test_writev(self):
1383 self.check(os.writev, [b'abc'])
1384
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001385 def test_inheritable(self):
1386 self.check(os.get_inheritable)
1387 self.check(os.set_inheritable, True)
1388
1389 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1390 'needs os.get_blocking() and os.set_blocking()')
1391 def test_blocking(self):
1392 self.check(os.get_blocking)
1393 self.check(os.set_blocking, True)
1394
Brian Curtin1b9df392010-11-24 20:24:31 +00001395
1396class LinkTests(unittest.TestCase):
1397 def setUp(self):
1398 self.file1 = support.TESTFN
1399 self.file2 = os.path.join(support.TESTFN + "2")
1400
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001401 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001402 for file in (self.file1, self.file2):
1403 if os.path.exists(file):
1404 os.unlink(file)
1405
Brian Curtin1b9df392010-11-24 20:24:31 +00001406 def _test_link(self, file1, file2):
1407 with open(file1, "w") as f1:
1408 f1.write("test")
1409
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001410 with warnings.catch_warnings():
1411 warnings.simplefilter("ignore", DeprecationWarning)
1412 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001413 with open(file1, "r") as f1, open(file2, "r") as f2:
1414 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1415
1416 def test_link(self):
1417 self._test_link(self.file1, self.file2)
1418
1419 def test_link_bytes(self):
1420 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1421 bytes(self.file2, sys.getfilesystemencoding()))
1422
Brian Curtinf498b752010-11-30 15:54:04 +00001423 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001424 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001425 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001426 except UnicodeError:
1427 raise unittest.SkipTest("Unable to encode for this platform.")
1428
Brian Curtinf498b752010-11-30 15:54:04 +00001429 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001430 self.file2 = self.file1 + "2"
1431 self._test_link(self.file1, self.file2)
1432
Serhiy Storchaka43767632013-11-03 21:31:38 +02001433@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1434class PosixUidGidTests(unittest.TestCase):
1435 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1436 def test_setuid(self):
1437 if os.getuid() != 0:
1438 self.assertRaises(OSError, os.setuid, 0)
1439 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001440
Serhiy Storchaka43767632013-11-03 21:31:38 +02001441 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1442 def test_setgid(self):
1443 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1444 self.assertRaises(OSError, os.setgid, 0)
1445 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001446
Serhiy Storchaka43767632013-11-03 21:31:38 +02001447 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1448 def test_seteuid(self):
1449 if os.getuid() != 0:
1450 self.assertRaises(OSError, os.seteuid, 0)
1451 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001452
Serhiy Storchaka43767632013-11-03 21:31:38 +02001453 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1454 def test_setegid(self):
1455 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1456 self.assertRaises(OSError, os.setegid, 0)
1457 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001458
Serhiy Storchaka43767632013-11-03 21:31:38 +02001459 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1460 def test_setreuid(self):
1461 if os.getuid() != 0:
1462 self.assertRaises(OSError, os.setreuid, 0, 0)
1463 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1464 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001465
Serhiy Storchaka43767632013-11-03 21:31:38 +02001466 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1467 def test_setreuid_neg1(self):
1468 # Needs to accept -1. We run this in a subprocess to avoid
1469 # altering the test runner's process state (issue8045).
1470 subprocess.check_call([
1471 sys.executable, '-c',
1472 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001473
Serhiy Storchaka43767632013-11-03 21:31:38 +02001474 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1475 def test_setregid(self):
1476 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1477 self.assertRaises(OSError, os.setregid, 0, 0)
1478 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1479 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001480
Serhiy Storchaka43767632013-11-03 21:31:38 +02001481 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1482 def test_setregid_neg1(self):
1483 # Needs to accept -1. We run this in a subprocess to avoid
1484 # altering the test runner's process state (issue8045).
1485 subprocess.check_call([
1486 sys.executable, '-c',
1487 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001488
Serhiy Storchaka43767632013-11-03 21:31:38 +02001489@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1490class Pep383Tests(unittest.TestCase):
1491 def setUp(self):
1492 if support.TESTFN_UNENCODABLE:
1493 self.dir = support.TESTFN_UNENCODABLE
1494 elif support.TESTFN_NONASCII:
1495 self.dir = support.TESTFN_NONASCII
1496 else:
1497 self.dir = support.TESTFN
1498 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001499
Serhiy Storchaka43767632013-11-03 21:31:38 +02001500 bytesfn = []
1501 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001502 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001503 fn = os.fsencode(fn)
1504 except UnicodeEncodeError:
1505 return
1506 bytesfn.append(fn)
1507 add_filename(support.TESTFN_UNICODE)
1508 if support.TESTFN_UNENCODABLE:
1509 add_filename(support.TESTFN_UNENCODABLE)
1510 if support.TESTFN_NONASCII:
1511 add_filename(support.TESTFN_NONASCII)
1512 if not bytesfn:
1513 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001514
Serhiy Storchaka43767632013-11-03 21:31:38 +02001515 self.unicodefn = set()
1516 os.mkdir(self.dir)
1517 try:
1518 for fn in bytesfn:
1519 support.create_empty_file(os.path.join(self.bdir, fn))
1520 fn = os.fsdecode(fn)
1521 if fn in self.unicodefn:
1522 raise ValueError("duplicate filename")
1523 self.unicodefn.add(fn)
1524 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001525 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001526 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001527
Serhiy Storchaka43767632013-11-03 21:31:38 +02001528 def tearDown(self):
1529 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001530
Serhiy Storchaka43767632013-11-03 21:31:38 +02001531 def test_listdir(self):
1532 expected = self.unicodefn
1533 found = set(os.listdir(self.dir))
1534 self.assertEqual(found, expected)
1535 # test listdir without arguments
1536 current_directory = os.getcwd()
1537 try:
1538 os.chdir(os.sep)
1539 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1540 finally:
1541 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001542
Serhiy Storchaka43767632013-11-03 21:31:38 +02001543 def test_open(self):
1544 for fn in self.unicodefn:
1545 f = open(os.path.join(self.dir, fn), 'rb')
1546 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001547
Serhiy Storchaka43767632013-11-03 21:31:38 +02001548 @unittest.skipUnless(hasattr(os, 'statvfs'),
1549 "need os.statvfs()")
1550 def test_statvfs(self):
1551 # issue #9645
1552 for fn in self.unicodefn:
1553 # should not fail with file not found error
1554 fullname = os.path.join(self.dir, fn)
1555 os.statvfs(fullname)
1556
1557 def test_stat(self):
1558 for fn in self.unicodefn:
1559 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001560
Brian Curtineb24d742010-04-12 17:16:38 +00001561@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1562class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001563 def _kill(self, sig):
1564 # Start sys.executable as a subprocess and communicate from the
1565 # subprocess to the parent that the interpreter is ready. When it
1566 # becomes ready, send *sig* via os.kill to the subprocess and check
1567 # that the return code is equal to *sig*.
1568 import ctypes
1569 from ctypes import wintypes
1570 import msvcrt
1571
1572 # Since we can't access the contents of the process' stdout until the
1573 # process has exited, use PeekNamedPipe to see what's inside stdout
1574 # without waiting. This is done so we can tell that the interpreter
1575 # is started and running at a point where it could handle a signal.
1576 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1577 PeekNamedPipe.restype = wintypes.BOOL
1578 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1579 ctypes.POINTER(ctypes.c_char), # stdout buf
1580 wintypes.DWORD, # Buffer size
1581 ctypes.POINTER(wintypes.DWORD), # bytes read
1582 ctypes.POINTER(wintypes.DWORD), # bytes avail
1583 ctypes.POINTER(wintypes.DWORD)) # bytes left
1584 msg = "running"
1585 proc = subprocess.Popen([sys.executable, "-c",
1586 "import sys;"
1587 "sys.stdout.write('{}');"
1588 "sys.stdout.flush();"
1589 "input()".format(msg)],
1590 stdout=subprocess.PIPE,
1591 stderr=subprocess.PIPE,
1592 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001593 self.addCleanup(proc.stdout.close)
1594 self.addCleanup(proc.stderr.close)
1595 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001596
1597 count, max = 0, 100
1598 while count < max and proc.poll() is None:
1599 # Create a string buffer to store the result of stdout from the pipe
1600 buf = ctypes.create_string_buffer(len(msg))
1601 # Obtain the text currently in proc.stdout
1602 # Bytes read/avail/left are left as NULL and unused
1603 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1604 buf, ctypes.sizeof(buf), None, None, None)
1605 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1606 if buf.value:
1607 self.assertEqual(msg, buf.value.decode())
1608 break
1609 time.sleep(0.1)
1610 count += 1
1611 else:
1612 self.fail("Did not receive communication from the subprocess")
1613
Brian Curtineb24d742010-04-12 17:16:38 +00001614 os.kill(proc.pid, sig)
1615 self.assertEqual(proc.wait(), sig)
1616
1617 def test_kill_sigterm(self):
1618 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001619 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001620
1621 def test_kill_int(self):
1622 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001623 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001624
1625 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001626 tagname = "test_os_%s" % uuid.uuid1()
1627 m = mmap.mmap(-1, 1, tagname)
1628 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001629 # Run a script which has console control handling enabled.
1630 proc = subprocess.Popen([sys.executable,
1631 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001632 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001633 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1634 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001635 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001636 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001637 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001638 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001639 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001640 count += 1
1641 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001642 # Forcefully kill the process if we weren't able to signal it.
1643 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001644 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001645 os.kill(proc.pid, event)
1646 # proc.send_signal(event) could also be done here.
1647 # Allow time for the signal to be passed and the process to exit.
1648 time.sleep(0.5)
1649 if not proc.poll():
1650 # Forcefully kill the process if we weren't able to signal it.
1651 os.kill(proc.pid, signal.SIGINT)
1652 self.fail("subprocess did not stop on {}".format(name))
1653
1654 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1655 def test_CTRL_C_EVENT(self):
1656 from ctypes import wintypes
1657 import ctypes
1658
1659 # Make a NULL value by creating a pointer with no argument.
1660 NULL = ctypes.POINTER(ctypes.c_int)()
1661 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1662 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1663 wintypes.BOOL)
1664 SetConsoleCtrlHandler.restype = wintypes.BOOL
1665
1666 # Calling this with NULL and FALSE causes the calling process to
1667 # handle CTRL+C, rather than ignore it. This property is inherited
1668 # by subprocesses.
1669 SetConsoleCtrlHandler(NULL, 0)
1670
1671 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1672
1673 def test_CTRL_BREAK_EVENT(self):
1674 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1675
1676
Brian Curtind40e6f72010-07-08 21:39:08 +00001677@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001678class Win32ListdirTests(unittest.TestCase):
1679 """Test listdir on Windows."""
1680
1681 def setUp(self):
1682 self.created_paths = []
1683 for i in range(2):
1684 dir_name = 'SUB%d' % i
1685 dir_path = os.path.join(support.TESTFN, dir_name)
1686 file_name = 'FILE%d' % i
1687 file_path = os.path.join(support.TESTFN, file_name)
1688 os.makedirs(dir_path)
1689 with open(file_path, 'w') as f:
1690 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1691 self.created_paths.extend([dir_name, file_name])
1692 self.created_paths.sort()
1693
1694 def tearDown(self):
1695 shutil.rmtree(support.TESTFN)
1696
1697 def test_listdir_no_extended_path(self):
1698 """Test when the path is not an "extended" path."""
1699 # unicode
1700 self.assertEqual(
1701 sorted(os.listdir(support.TESTFN)),
1702 self.created_paths)
1703 # bytes
1704 self.assertEqual(
1705 sorted(os.listdir(os.fsencode(support.TESTFN))),
1706 [os.fsencode(path) for path in self.created_paths])
1707
1708 def test_listdir_extended_path(self):
1709 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001710 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001711 # unicode
1712 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1713 self.assertEqual(
1714 sorted(os.listdir(path)),
1715 self.created_paths)
1716 # bytes
1717 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1718 self.assertEqual(
1719 sorted(os.listdir(path)),
1720 [os.fsencode(path) for path in self.created_paths])
1721
1722
1723@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001724@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001725class Win32SymlinkTests(unittest.TestCase):
1726 filelink = 'filelinktest'
1727 filelink_target = os.path.abspath(__file__)
1728 dirlink = 'dirlinktest'
1729 dirlink_target = os.path.dirname(filelink_target)
1730 missing_link = 'missing link'
1731
1732 def setUp(self):
1733 assert os.path.exists(self.dirlink_target)
1734 assert os.path.exists(self.filelink_target)
1735 assert not os.path.exists(self.dirlink)
1736 assert not os.path.exists(self.filelink)
1737 assert not os.path.exists(self.missing_link)
1738
1739 def tearDown(self):
1740 if os.path.exists(self.filelink):
1741 os.remove(self.filelink)
1742 if os.path.exists(self.dirlink):
1743 os.rmdir(self.dirlink)
1744 if os.path.lexists(self.missing_link):
1745 os.remove(self.missing_link)
1746
1747 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001748 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001749 self.assertTrue(os.path.exists(self.dirlink))
1750 self.assertTrue(os.path.isdir(self.dirlink))
1751 self.assertTrue(os.path.islink(self.dirlink))
1752 self.check_stat(self.dirlink, self.dirlink_target)
1753
1754 def test_file_link(self):
1755 os.symlink(self.filelink_target, self.filelink)
1756 self.assertTrue(os.path.exists(self.filelink))
1757 self.assertTrue(os.path.isfile(self.filelink))
1758 self.assertTrue(os.path.islink(self.filelink))
1759 self.check_stat(self.filelink, self.filelink_target)
1760
1761 def _create_missing_dir_link(self):
1762 'Create a "directory" link to a non-existent target'
1763 linkname = self.missing_link
1764 if os.path.lexists(linkname):
1765 os.remove(linkname)
1766 target = r'c:\\target does not exist.29r3c740'
1767 assert not os.path.exists(target)
1768 target_is_dir = True
1769 os.symlink(target, linkname, target_is_dir)
1770
1771 def test_remove_directory_link_to_missing_target(self):
1772 self._create_missing_dir_link()
1773 # For compatibility with Unix, os.remove will check the
1774 # directory status and call RemoveDirectory if the symlink
1775 # was created with target_is_dir==True.
1776 os.remove(self.missing_link)
1777
1778 @unittest.skip("currently fails; consider for improvement")
1779 def test_isdir_on_directory_link_to_missing_target(self):
1780 self._create_missing_dir_link()
1781 # consider having isdir return true for directory links
1782 self.assertTrue(os.path.isdir(self.missing_link))
1783
1784 @unittest.skip("currently fails; consider for improvement")
1785 def test_rmdir_on_directory_link_to_missing_target(self):
1786 self._create_missing_dir_link()
1787 # consider allowing rmdir to remove directory links
1788 os.rmdir(self.missing_link)
1789
1790 def check_stat(self, link, target):
1791 self.assertEqual(os.stat(link), os.stat(target))
1792 self.assertNotEqual(os.lstat(link), os.stat(link))
1793
Brian Curtind25aef52011-06-13 15:16:04 -05001794 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001795 with warnings.catch_warnings():
1796 warnings.simplefilter("ignore", DeprecationWarning)
1797 self.assertEqual(os.stat(bytes_link), os.stat(target))
1798 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001799
1800 def test_12084(self):
1801 level1 = os.path.abspath(support.TESTFN)
1802 level2 = os.path.join(level1, "level2")
1803 level3 = os.path.join(level2, "level3")
1804 try:
1805 os.mkdir(level1)
1806 os.mkdir(level2)
1807 os.mkdir(level3)
1808
1809 file1 = os.path.abspath(os.path.join(level1, "file1"))
1810
1811 with open(file1, "w") as f:
1812 f.write("file1")
1813
1814 orig_dir = os.getcwd()
1815 try:
1816 os.chdir(level2)
1817 link = os.path.join(level2, "link")
1818 os.symlink(os.path.relpath(file1), "link")
1819 self.assertIn("link", os.listdir(os.getcwd()))
1820
1821 # Check os.stat calls from the same dir as the link
1822 self.assertEqual(os.stat(file1), os.stat("link"))
1823
1824 # Check os.stat calls from a dir below the link
1825 os.chdir(level1)
1826 self.assertEqual(os.stat(file1),
1827 os.stat(os.path.relpath(link)))
1828
1829 # Check os.stat calls from a dir above the link
1830 os.chdir(level3)
1831 self.assertEqual(os.stat(file1),
1832 os.stat(os.path.relpath(link)))
1833 finally:
1834 os.chdir(orig_dir)
1835 except OSError as err:
1836 self.fail(err)
1837 finally:
1838 os.remove(file1)
1839 shutil.rmtree(level1)
1840
Brian Curtind40e6f72010-07-08 21:39:08 +00001841
Tim Golden0321cf22014-05-05 19:46:17 +01001842@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1843class Win32JunctionTests(unittest.TestCase):
1844 junction = 'junctiontest'
1845 junction_target = os.path.dirname(os.path.abspath(__file__))
1846
1847 def setUp(self):
1848 assert os.path.exists(self.junction_target)
1849 assert not os.path.exists(self.junction)
1850
1851 def tearDown(self):
1852 if os.path.exists(self.junction):
1853 # os.rmdir delegates to Windows' RemoveDirectoryW,
1854 # which removes junction points safely.
1855 os.rmdir(self.junction)
1856
1857 def test_create_junction(self):
1858 _winapi.CreateJunction(self.junction_target, self.junction)
1859 self.assertTrue(os.path.exists(self.junction))
1860 self.assertTrue(os.path.isdir(self.junction))
1861
1862 # Junctions are not recognized as links.
1863 self.assertFalse(os.path.islink(self.junction))
1864
1865 def test_unlink_removes_junction(self):
1866 _winapi.CreateJunction(self.junction_target, self.junction)
1867 self.assertTrue(os.path.exists(self.junction))
1868
1869 os.unlink(self.junction)
1870 self.assertFalse(os.path.exists(self.junction))
1871
1872
Jason R. Coombs3a092862013-05-27 23:21:28 -04001873@support.skip_unless_symlink
1874class NonLocalSymlinkTests(unittest.TestCase):
1875
1876 def setUp(self):
1877 """
1878 Create this structure:
1879
1880 base
1881 \___ some_dir
1882 """
1883 os.makedirs('base/some_dir')
1884
1885 def tearDown(self):
1886 shutil.rmtree('base')
1887
1888 def test_directory_link_nonlocal(self):
1889 """
1890 The symlink target should resolve relative to the link, not relative
1891 to the current directory.
1892
1893 Then, link base/some_link -> base/some_dir and ensure that some_link
1894 is resolved as a directory.
1895
1896 In issue13772, it was discovered that directory detection failed if
1897 the symlink target was not specified relative to the current
1898 directory, which was a defect in the implementation.
1899 """
1900 src = os.path.join('base', 'some_link')
1901 os.symlink('some_dir', src)
1902 assert os.path.isdir(src)
1903
1904
Victor Stinnere8d51452010-08-19 01:05:19 +00001905class FSEncodingTests(unittest.TestCase):
1906 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001907 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1908 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001909
Victor Stinnere8d51452010-08-19 01:05:19 +00001910 def test_identity(self):
1911 # assert fsdecode(fsencode(x)) == x
1912 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1913 try:
1914 bytesfn = os.fsencode(fn)
1915 except UnicodeEncodeError:
1916 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001917 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001918
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001919
Brett Cannonefb00c02012-02-29 18:31:31 -05001920
1921class DeviceEncodingTests(unittest.TestCase):
1922
1923 def test_bad_fd(self):
1924 # Return None when an fd doesn't actually exist.
1925 self.assertIsNone(os.device_encoding(123456))
1926
Philip Jenveye308b7c2012-02-29 16:16:15 -08001927 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1928 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001929 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001930 def test_device_encoding(self):
1931 encoding = os.device_encoding(0)
1932 self.assertIsNotNone(encoding)
1933 self.assertTrue(codecs.lookup(encoding))
1934
1935
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001936class PidTests(unittest.TestCase):
1937 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1938 def test_getppid(self):
1939 p = subprocess.Popen([sys.executable, '-c',
1940 'import os; print(os.getppid())'],
1941 stdout=subprocess.PIPE)
1942 stdout, _ = p.communicate()
1943 # We are the parent of our subprocess
1944 self.assertEqual(int(stdout), os.getpid())
1945
1946
Brian Curtin0151b8e2010-09-24 13:43:43 +00001947# The introduction of this TestCase caused at least two different errors on
1948# *nix buildbots. Temporarily skip this to let the buildbots move along.
1949@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001950@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1951class LoginTests(unittest.TestCase):
1952 def test_getlogin(self):
1953 user_name = os.getlogin()
1954 self.assertNotEqual(len(user_name), 0)
1955
1956
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001957@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1958 "needs os.getpriority and os.setpriority")
1959class ProgramPriorityTests(unittest.TestCase):
1960 """Tests for os.getpriority() and os.setpriority()."""
1961
1962 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001963
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001964 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1965 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1966 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001967 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1968 if base >= 19 and new_prio <= 19:
1969 raise unittest.SkipTest(
1970 "unable to reliably test setpriority at current nice level of %s" % base)
1971 else:
1972 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001973 finally:
1974 try:
1975 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1976 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001977 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001978 raise
1979
1980
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001981if threading is not None:
1982 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001983
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001984 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001985
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001986 def __init__(self, conn):
1987 asynchat.async_chat.__init__(self, conn)
1988 self.in_buffer = []
1989 self.closed = False
1990 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001991
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001992 def handle_read(self):
1993 data = self.recv(4096)
1994 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001995
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001996 def get_data(self):
1997 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001998
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001999 def handle_close(self):
2000 self.close()
2001 self.closed = True
2002
2003 def handle_error(self):
2004 raise
2005
2006 def __init__(self, address):
2007 threading.Thread.__init__(self)
2008 asyncore.dispatcher.__init__(self)
2009 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2010 self.bind(address)
2011 self.listen(5)
2012 self.host, self.port = self.socket.getsockname()[:2]
2013 self.handler_instance = None
2014 self._active = False
2015 self._active_lock = threading.Lock()
2016
2017 # --- public API
2018
2019 @property
2020 def running(self):
2021 return self._active
2022
2023 def start(self):
2024 assert not self.running
2025 self.__flag = threading.Event()
2026 threading.Thread.start(self)
2027 self.__flag.wait()
2028
2029 def stop(self):
2030 assert self.running
2031 self._active = False
2032 self.join()
2033
2034 def wait(self):
2035 # wait for handler connection to be closed, then stop the server
2036 while not getattr(self.handler_instance, "closed", False):
2037 time.sleep(0.001)
2038 self.stop()
2039
2040 # --- internals
2041
2042 def run(self):
2043 self._active = True
2044 self.__flag.set()
2045 while self._active and asyncore.socket_map:
2046 self._active_lock.acquire()
2047 asyncore.loop(timeout=0.001, count=1)
2048 self._active_lock.release()
2049 asyncore.close_all()
2050
2051 def handle_accept(self):
2052 conn, addr = self.accept()
2053 self.handler_instance = self.Handler(conn)
2054
2055 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002056 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002057 handle_read = handle_connect
2058
2059 def writable(self):
2060 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002061
2062 def handle_error(self):
2063 raise
2064
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002065
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002066@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002067@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2068class TestSendfile(unittest.TestCase):
2069
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002070 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002071 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002072 not sys.platform.startswith("solaris") and \
2073 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002074 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2075 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002076
2077 @classmethod
2078 def setUpClass(cls):
2079 with open(support.TESTFN, "wb") as f:
2080 f.write(cls.DATA)
2081
2082 @classmethod
2083 def tearDownClass(cls):
2084 support.unlink(support.TESTFN)
2085
2086 def setUp(self):
2087 self.server = SendfileTestServer((support.HOST, 0))
2088 self.server.start()
2089 self.client = socket.socket()
2090 self.client.connect((self.server.host, self.server.port))
2091 self.client.settimeout(1)
2092 # synchronize by waiting for "220 ready" response
2093 self.client.recv(1024)
2094 self.sockno = self.client.fileno()
2095 self.file = open(support.TESTFN, 'rb')
2096 self.fileno = self.file.fileno()
2097
2098 def tearDown(self):
2099 self.file.close()
2100 self.client.close()
2101 if self.server.running:
2102 self.server.stop()
2103
2104 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2105 """A higher level wrapper representing how an application is
2106 supposed to use sendfile().
2107 """
2108 while 1:
2109 try:
2110 if self.SUPPORT_HEADERS_TRAILERS:
2111 return os.sendfile(sock, file, offset, nbytes, headers,
2112 trailers)
2113 else:
2114 return os.sendfile(sock, file, offset, nbytes)
2115 except OSError as err:
2116 if err.errno == errno.ECONNRESET:
2117 # disconnected
2118 raise
2119 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2120 # we have to retry send data
2121 continue
2122 else:
2123 raise
2124
2125 def test_send_whole_file(self):
2126 # normal send
2127 total_sent = 0
2128 offset = 0
2129 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002130 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002131 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2132 if sent == 0:
2133 break
2134 offset += sent
2135 total_sent += sent
2136 self.assertTrue(sent <= nbytes)
2137 self.assertEqual(offset, total_sent)
2138
2139 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002140 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002141 self.client.close()
2142 self.server.wait()
2143 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002144 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002145 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002146
2147 def test_send_at_certain_offset(self):
2148 # start sending a file at a certain offset
2149 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002150 offset = len(self.DATA) // 2
2151 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002152 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002153 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002154 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2155 if sent == 0:
2156 break
2157 offset += sent
2158 total_sent += sent
2159 self.assertTrue(sent <= nbytes)
2160
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002161 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002162 self.client.close()
2163 self.server.wait()
2164 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002165 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002166 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002167 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002168 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002169
2170 def test_offset_overflow(self):
2171 # specify an offset > file size
2172 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002173 try:
2174 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2175 except OSError as e:
2176 # Solaris can raise EINVAL if offset >= file length, ignore.
2177 if e.errno != errno.EINVAL:
2178 raise
2179 else:
2180 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002181 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002182 self.client.close()
2183 self.server.wait()
2184 data = self.server.handler_instance.get_data()
2185 self.assertEqual(data, b'')
2186
2187 def test_invalid_offset(self):
2188 with self.assertRaises(OSError) as cm:
2189 os.sendfile(self.sockno, self.fileno, -1, 4096)
2190 self.assertEqual(cm.exception.errno, errno.EINVAL)
2191
2192 # --- headers / trailers tests
2193
Serhiy Storchaka43767632013-11-03 21:31:38 +02002194 @requires_headers_trailers
2195 def test_headers(self):
2196 total_sent = 0
2197 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2198 headers=[b"x" * 512])
2199 total_sent += sent
2200 offset = 4096
2201 nbytes = 4096
2202 while 1:
2203 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2204 offset, nbytes)
2205 if sent == 0:
2206 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002207 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002208 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002209
Serhiy Storchaka43767632013-11-03 21:31:38 +02002210 expected_data = b"x" * 512 + self.DATA
2211 self.assertEqual(total_sent, len(expected_data))
2212 self.client.close()
2213 self.server.wait()
2214 data = self.server.handler_instance.get_data()
2215 self.assertEqual(hash(data), hash(expected_data))
2216
2217 @requires_headers_trailers
2218 def test_trailers(self):
2219 TESTFN2 = support.TESTFN + "2"
2220 file_data = b"abcdef"
2221 with open(TESTFN2, 'wb') as f:
2222 f.write(file_data)
2223 with open(TESTFN2, 'rb')as f:
2224 self.addCleanup(os.remove, TESTFN2)
2225 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2226 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002227 self.client.close()
2228 self.server.wait()
2229 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002230 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002231
Serhiy Storchaka43767632013-11-03 21:31:38 +02002232 @requires_headers_trailers
2233 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2234 'test needs os.SF_NODISKIO')
2235 def test_flags(self):
2236 try:
2237 os.sendfile(self.sockno, self.fileno, 0, 4096,
2238 flags=os.SF_NODISKIO)
2239 except OSError as err:
2240 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2241 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002242
2243
Larry Hastings9cf065c2012-06-22 16:30:09 -07002244def supports_extended_attributes():
2245 if not hasattr(os, "setxattr"):
2246 return False
2247 try:
2248 with open(support.TESTFN, "wb") as fp:
2249 try:
2250 os.setxattr(fp.fileno(), b"user.test", b"")
2251 except OSError:
2252 return False
2253 finally:
2254 support.unlink(support.TESTFN)
2255 # Kernels < 2.6.39 don't respect setxattr flags.
2256 kernel_version = platform.release()
2257 m = re.match("2.6.(\d{1,2})", kernel_version)
2258 return m is None or int(m.group(1)) >= 39
2259
2260
2261@unittest.skipUnless(supports_extended_attributes(),
2262 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002263class ExtendedAttributeTests(unittest.TestCase):
2264
2265 def tearDown(self):
2266 support.unlink(support.TESTFN)
2267
Larry Hastings9cf065c2012-06-22 16:30:09 -07002268 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002269 fn = support.TESTFN
2270 open(fn, "wb").close()
2271 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002272 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002273 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002274 init_xattr = listxattr(fn)
2275 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002276 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002277 xattr = set(init_xattr)
2278 xattr.add("user.test")
2279 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002280 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2281 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2282 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002283 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002284 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002285 self.assertEqual(cm.exception.errno, errno.EEXIST)
2286 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002287 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002288 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002289 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002290 xattr.add("user.test2")
2291 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002292 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002293 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002294 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002295 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002296 xattr.remove("user.test")
2297 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002298 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2299 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2300 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2301 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002302 many = sorted("user.test{}".format(i) for i in range(100))
2303 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002304 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002305 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002306
Larry Hastings9cf065c2012-06-22 16:30:09 -07002307 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002308 def make_bytes(s):
2309 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002310 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002311 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002312 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002313
2314 def test_simple(self):
2315 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2316 os.listxattr)
2317
2318 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002319 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2320 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002321
2322 def test_fds(self):
2323 def getxattr(path, *args):
2324 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002325 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002326 def setxattr(path, *args):
2327 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002328 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002329 def removexattr(path, *args):
2330 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002331 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002332 def listxattr(path, *args):
2333 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002334 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002335 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2336
2337
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002338@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2339class Win32DeprecatedBytesAPI(unittest.TestCase):
2340 def test_deprecated(self):
2341 import nt
2342 filename = os.fsencode(support.TESTFN)
2343 with warnings.catch_warnings():
2344 warnings.simplefilter("error", DeprecationWarning)
2345 for func, *args in (
2346 (nt._getfullpathname, filename),
2347 (nt._isdir, filename),
2348 (os.access, filename, os.R_OK),
2349 (os.chdir, filename),
2350 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002351 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002352 (os.link, filename, filename),
2353 (os.listdir, filename),
2354 (os.lstat, filename),
2355 (os.mkdir, filename),
2356 (os.open, filename, os.O_RDONLY),
2357 (os.rename, filename, filename),
2358 (os.rmdir, filename),
2359 (os.startfile, filename),
2360 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002361 (os.unlink, filename),
2362 (os.utime, filename),
2363 ):
2364 self.assertRaises(DeprecationWarning, func, *args)
2365
Victor Stinner28216442011-11-16 00:34:44 +01002366 @support.skip_unless_symlink
2367 def test_symlink(self):
2368 filename = os.fsencode(support.TESTFN)
2369 with warnings.catch_warnings():
2370 warnings.simplefilter("error", DeprecationWarning)
2371 self.assertRaises(DeprecationWarning,
2372 os.symlink, filename, filename)
2373
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002374
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002375@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2376class TermsizeTests(unittest.TestCase):
2377 def test_does_not_crash(self):
2378 """Check if get_terminal_size() returns a meaningful value.
2379
2380 There's no easy portable way to actually check the size of the
2381 terminal, so let's check if it returns something sensible instead.
2382 """
2383 try:
2384 size = os.get_terminal_size()
2385 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002386 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002387 # Under win32 a generic OSError can be thrown if the
2388 # handle cannot be retrieved
2389 self.skipTest("failed to query terminal size")
2390 raise
2391
Antoine Pitroucfade362012-02-08 23:48:59 +01002392 self.assertGreaterEqual(size.columns, 0)
2393 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002394
2395 def test_stty_match(self):
2396 """Check if stty returns the same results
2397
2398 stty actually tests stdin, so get_terminal_size is invoked on
2399 stdin explicitly. If stty succeeded, then get_terminal_size()
2400 should work too.
2401 """
2402 try:
2403 size = subprocess.check_output(['stty', 'size']).decode().split()
2404 except (FileNotFoundError, subprocess.CalledProcessError):
2405 self.skipTest("stty invocation failed")
2406 expected = (int(size[1]), int(size[0])) # reversed order
2407
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002408 try:
2409 actual = os.get_terminal_size(sys.__stdin__.fileno())
2410 except OSError as e:
2411 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2412 # Under win32 a generic OSError can be thrown if the
2413 # handle cannot be retrieved
2414 self.skipTest("failed to query terminal size")
2415 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002416 self.assertEqual(expected, actual)
2417
2418
Victor Stinner292c8352012-10-30 02:17:38 +01002419class OSErrorTests(unittest.TestCase):
2420 def setUp(self):
2421 class Str(str):
2422 pass
2423
Victor Stinnerafe17062012-10-31 22:47:43 +01002424 self.bytes_filenames = []
2425 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002426 if support.TESTFN_UNENCODABLE is not None:
2427 decoded = support.TESTFN_UNENCODABLE
2428 else:
2429 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002430 self.unicode_filenames.append(decoded)
2431 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002432 if support.TESTFN_UNDECODABLE is not None:
2433 encoded = support.TESTFN_UNDECODABLE
2434 else:
2435 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002436 self.bytes_filenames.append(encoded)
2437 self.bytes_filenames.append(memoryview(encoded))
2438
2439 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002440
2441 def test_oserror_filename(self):
2442 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002443 (self.filenames, os.chdir,),
2444 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002445 (self.filenames, os.lstat,),
2446 (self.filenames, os.open, os.O_RDONLY),
2447 (self.filenames, os.rmdir,),
2448 (self.filenames, os.stat,),
2449 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002450 ]
2451 if sys.platform == "win32":
2452 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002453 (self.bytes_filenames, os.rename, b"dst"),
2454 (self.bytes_filenames, os.replace, b"dst"),
2455 (self.unicode_filenames, os.rename, "dst"),
2456 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002457 # Issue #16414: Don't test undecodable names with listdir()
2458 # because of a Windows bug.
2459 #
2460 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2461 # empty list (instead of failing), whereas os.listdir(b'\xff')
2462 # raises a FileNotFoundError. It looks like a Windows bug:
2463 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2464 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2465 # ERROR_PATH_NOT_FOUND (3).
2466 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002467 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002468 else:
2469 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002470 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002471 (self.filenames, os.rename, "dst"),
2472 (self.filenames, os.replace, "dst"),
2473 ))
2474 if hasattr(os, "chown"):
2475 funcs.append((self.filenames, os.chown, 0, 0))
2476 if hasattr(os, "lchown"):
2477 funcs.append((self.filenames, os.lchown, 0, 0))
2478 if hasattr(os, "truncate"):
2479 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002480 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002481 funcs.append((self.filenames, os.chflags, 0))
2482 if hasattr(os, "lchflags"):
2483 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002484 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002485 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002486 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002487 if sys.platform == "win32":
2488 funcs.append((self.bytes_filenames, os.link, b"dst"))
2489 funcs.append((self.unicode_filenames, os.link, "dst"))
2490 else:
2491 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002492 if hasattr(os, "listxattr"):
2493 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002494 (self.filenames, os.listxattr,),
2495 (self.filenames, os.getxattr, "user.test"),
2496 (self.filenames, os.setxattr, "user.test", b'user'),
2497 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002498 ))
2499 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002500 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002501 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002502 if sys.platform == "win32":
2503 funcs.append((self.unicode_filenames, os.readlink,))
2504 else:
2505 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002506
Victor Stinnerafe17062012-10-31 22:47:43 +01002507 for filenames, func, *func_args in funcs:
2508 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002509 try:
2510 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002511 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002512 self.assertIs(err.filename, name)
2513 else:
2514 self.fail("No exception thrown by {}".format(func))
2515
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002516class CPUCountTests(unittest.TestCase):
2517 def test_cpu_count(self):
2518 cpus = os.cpu_count()
2519 if cpus is not None:
2520 self.assertIsInstance(cpus, int)
2521 self.assertGreater(cpus, 0)
2522 else:
2523 self.skipTest("Could not determine the number of CPUs")
2524
Victor Stinnerdaf45552013-08-28 00:53:59 +02002525
2526class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002527 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002528 fd = os.open(__file__, os.O_RDONLY)
2529 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002530 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002531
Victor Stinnerdaf45552013-08-28 00:53:59 +02002532 os.set_inheritable(fd, True)
2533 self.assertEqual(os.get_inheritable(fd), True)
2534
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002535 @unittest.skipIf(fcntl is None, "need fcntl")
2536 def test_get_inheritable_cloexec(self):
2537 fd = os.open(__file__, os.O_RDONLY)
2538 self.addCleanup(os.close, fd)
2539 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002540
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002541 # clear FD_CLOEXEC flag
2542 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2543 flags &= ~fcntl.FD_CLOEXEC
2544 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002545
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002546 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002547
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002548 @unittest.skipIf(fcntl is None, "need fcntl")
2549 def test_set_inheritable_cloexec(self):
2550 fd = os.open(__file__, os.O_RDONLY)
2551 self.addCleanup(os.close, fd)
2552 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2553 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002554
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002555 os.set_inheritable(fd, True)
2556 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2557 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002558
Victor Stinnerdaf45552013-08-28 00:53:59 +02002559 def test_open(self):
2560 fd = os.open(__file__, os.O_RDONLY)
2561 self.addCleanup(os.close, fd)
2562 self.assertEqual(os.get_inheritable(fd), False)
2563
2564 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2565 def test_pipe(self):
2566 rfd, wfd = os.pipe()
2567 self.addCleanup(os.close, rfd)
2568 self.addCleanup(os.close, wfd)
2569 self.assertEqual(os.get_inheritable(rfd), False)
2570 self.assertEqual(os.get_inheritable(wfd), False)
2571
2572 def test_dup(self):
2573 fd1 = os.open(__file__, os.O_RDONLY)
2574 self.addCleanup(os.close, fd1)
2575
2576 fd2 = os.dup(fd1)
2577 self.addCleanup(os.close, fd2)
2578 self.assertEqual(os.get_inheritable(fd2), False)
2579
2580 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2581 def test_dup2(self):
2582 fd = os.open(__file__, os.O_RDONLY)
2583 self.addCleanup(os.close, fd)
2584
2585 # inheritable by default
2586 fd2 = os.open(__file__, os.O_RDONLY)
2587 try:
2588 os.dup2(fd, fd2)
2589 self.assertEqual(os.get_inheritable(fd2), True)
2590 finally:
2591 os.close(fd2)
2592
2593 # force non-inheritable
2594 fd3 = os.open(__file__, os.O_RDONLY)
2595 try:
2596 os.dup2(fd, fd3, inheritable=False)
2597 self.assertEqual(os.get_inheritable(fd3), False)
2598 finally:
2599 os.close(fd3)
2600
2601 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2602 def test_openpty(self):
2603 master_fd, slave_fd = os.openpty()
2604 self.addCleanup(os.close, master_fd)
2605 self.addCleanup(os.close, slave_fd)
2606 self.assertEqual(os.get_inheritable(master_fd), False)
2607 self.assertEqual(os.get_inheritable(slave_fd), False)
2608
2609
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002610@unittest.skipUnless(hasattr(os, 'get_blocking'),
2611 'needs os.get_blocking() and os.set_blocking()')
2612class BlockingTests(unittest.TestCase):
2613 def test_blocking(self):
2614 fd = os.open(__file__, os.O_RDONLY)
2615 self.addCleanup(os.close, fd)
2616 self.assertEqual(os.get_blocking(fd), True)
2617
2618 os.set_blocking(fd, False)
2619 self.assertEqual(os.get_blocking(fd), False)
2620
2621 os.set_blocking(fd, True)
2622 self.assertEqual(os.get_blocking(fd), True)
2623
2624
Yury Selivanov97e2e062014-09-26 12:33:06 -04002625
2626class ExportsTests(unittest.TestCase):
2627 def test_os_all(self):
2628 self.assertIn('open', os.__all__)
2629 self.assertIn('walk', os.__all__)
2630
2631
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002632@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002633def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002634 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002635 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002636 StatAttributeTests,
2637 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002638 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002639 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002640 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002641 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002642 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002643 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002644 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002645 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002646 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002647 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002648 Win32KillTests,
Tim Golden781bbeb2013-10-25 20:24:06 +01002649 Win32ListdirTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002650 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002651 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002652 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002653 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002654 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002655 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002656 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002657 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002658 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002659 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002660 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002661 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002662 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002663 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002664 CPUCountTests,
Victor Stinnerdaf45552013-08-28 00:53:59 +02002665 FDInheritanceTests,
Tim Golden0321cf22014-05-05 19:46:17 +01002666 Win32JunctionTests,
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002667 BlockingTests,
Yury Selivanov97e2e062014-09-26 12:33:06 -04002668 ExportsTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002669 )
Fred Drake2e2be372001-09-20 21:33:42 +00002670
2671if __name__ == "__main__":
2672 test_main()