blob: ea2949a3778df3e0650c5530a84a642203e5db93 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070027import decimal
28import fractions
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000029try:
30 import threading
31except ImportError:
32 threading = None
Georg Brandl2daf6ae2012-02-20 19:54:16 +010033from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000034
Victor Stinner034d0aa2012-06-05 01:22:15 +020035with warnings.catch_warnings():
36 warnings.simplefilter("ignore", DeprecationWarning)
37 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010038st = os.stat(__file__)
39stat_supports_subsecond = (
40 # check if float and int timestamps are different
41 (st.st_atime != st[7])
42 or (st.st_mtime != st[8])
43 or (st.st_ctime != st[9]))
44
Mark Dickinson7cf03892010-04-16 13:45:35 +000045# Detect whether we're on a Linux system that uses the (now outdated
46# and unmaintained) linuxthreads threading library. There's an issue
47# when combining linuxthreads with a failed execv call: see
48# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020049if hasattr(sys, 'thread_info') and sys.thread_info.version:
50 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
51else:
52 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000053
Stefan Krahebee49a2013-01-17 15:31:00 +010054# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
55HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
56
Thomas Wouters0e3f5912006-08-11 14:57:12 +000057# Tests creating TESTFN
58class FileTests(unittest.TestCase):
59 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000060 if os.path.exists(support.TESTFN):
61 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000062 tearDown = setUp
63
64 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000065 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000066 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000067 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000068
Christian Heimesfdab48e2008-01-20 09:06:41 +000069 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000070 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
71 # We must allocate two consecutive file descriptors, otherwise
72 # it will mess up other file descriptors (perhaps even the three
73 # standard ones).
74 second = os.dup(first)
75 try:
76 retries = 0
77 while second != first + 1:
78 os.close(first)
79 retries += 1
80 if retries > 10:
81 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000082 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000083 first, second = second, os.dup(second)
84 finally:
85 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000086 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000087 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000088 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000089
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000090 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000091 def test_rename(self):
92 path = support.TESTFN
93 old = sys.getrefcount(path)
94 self.assertRaises(TypeError, os.rename, path, 0)
95 new = sys.getrefcount(path)
96 self.assertEqual(old, new)
97
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000098 def test_read(self):
99 with open(support.TESTFN, "w+b") as fobj:
100 fobj.write(b"spam")
101 fobj.flush()
102 fd = fobj.fileno()
103 os.lseek(fd, 0, 0)
104 s = os.read(fd, 4)
105 self.assertEqual(type(s), bytes)
106 self.assertEqual(s, b"spam")
107
108 def test_write(self):
109 # os.write() accepts bytes- and buffer-like objects but not strings
110 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
111 self.assertRaises(TypeError, os.write, fd, "beans")
112 os.write(fd, b"bacon\n")
113 os.write(fd, bytearray(b"eggs\n"))
114 os.write(fd, memoryview(b"spam\n"))
115 os.close(fd)
116 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000117 self.assertEqual(fobj.read().splitlines(),
118 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000119
Victor Stinnere0daff12011-03-20 23:36:35 +0100120 def write_windows_console(self, *args):
121 retcode = subprocess.call(args,
122 # use a new console to not flood the test output
123 creationflags=subprocess.CREATE_NEW_CONSOLE,
124 # use a shell to hide the console window (SW_HIDE)
125 shell=True)
126 self.assertEqual(retcode, 0)
127
128 @unittest.skipUnless(sys.platform == 'win32',
129 'test specific to the Windows console')
130 def test_write_windows_console(self):
131 # Issue #11395: the Windows console returns an error (12: not enough
132 # space error) on writing into stdout if stdout mode is binary and the
133 # length is greater than 66,000 bytes (or less, depending on heap
134 # usage).
135 code = "print('x' * 100000)"
136 self.write_windows_console(sys.executable, "-c", code)
137 self.write_windows_console(sys.executable, "-u", "-c", code)
138
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000139 def fdopen_helper(self, *args):
140 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200141 f = os.fdopen(fd, *args)
142 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000143
144 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200145 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
146 os.close(fd)
147
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000148 self.fdopen_helper()
149 self.fdopen_helper('r')
150 self.fdopen_helper('r', 100)
151
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100152 def test_replace(self):
153 TESTFN2 = support.TESTFN + ".2"
154 with open(support.TESTFN, 'w') as f:
155 f.write("1")
156 with open(TESTFN2, 'w') as f:
157 f.write("2")
158 self.addCleanup(os.unlink, TESTFN2)
159 os.replace(support.TESTFN, TESTFN2)
160 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
161 with open(TESTFN2, 'r') as f:
162 self.assertEqual(f.read(), "1")
163
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200164
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165# Test attributes on return values from os.*stat* family.
166class StatAttributeTests(unittest.TestCase):
167 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000168 os.mkdir(support.TESTFN)
169 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000170 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000171 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000172 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000173
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000174 def tearDown(self):
175 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000176 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000177
Antoine Pitrou38425292010-09-21 18:19:07 +0000178 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000179 if not hasattr(os, "stat"):
180 return
181
Antoine Pitrou38425292010-09-21 18:19:07 +0000182 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000183
184 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000185 self.assertEqual(result[stat.ST_SIZE], 3)
186 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000187
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000188 # Make sure all the attributes are there
189 members = dir(result)
190 for name in dir(stat):
191 if name[:3] == 'ST_':
192 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000193 if name.endswith("TIME"):
194 def trunc(x): return int(x)
195 else:
196 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000197 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000198 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000199 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000200
Larry Hastings6fe20b32012-04-19 15:07:49 -0700201 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700202 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700203 for name in 'st_atime st_mtime st_ctime'.split():
204 floaty = int(getattr(result, name) * 100000)
205 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700206 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700207
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000208 try:
209 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200210 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000211 except IndexError:
212 pass
213
214 # Make sure that assignment fails
215 try:
216 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200217 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000218 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000219 pass
220
221 try:
222 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200223 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000224 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000225 pass
226
227 try:
228 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200229 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000230 except AttributeError:
231 pass
232
233 # Use the stat_result constructor with a too-short tuple.
234 try:
235 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200236 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000237 except TypeError:
238 pass
239
Ezio Melotti42da6632011-03-15 05:18:48 +0200240 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241 try:
242 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
243 except TypeError:
244 pass
245
Antoine Pitrou38425292010-09-21 18:19:07 +0000246 def test_stat_attributes(self):
247 self.check_stat_attributes(self.fname)
248
249 def test_stat_attributes_bytes(self):
250 try:
251 fname = self.fname.encode(sys.getfilesystemencoding())
252 except UnicodeEncodeError:
253 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100254 with warnings.catch_warnings():
255 warnings.simplefilter("ignore", DeprecationWarning)
256 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000257
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258 def test_statvfs_attributes(self):
259 if not hasattr(os, "statvfs"):
260 return
261
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000262 try:
263 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000264 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000265 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000266 if e.errno == errno.ENOSYS:
267 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000268
269 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000270 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000272 # Make sure all the attributes are there.
273 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
274 'ffree', 'favail', 'flag', 'namemax')
275 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000276 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000277
278 # Make sure that assignment really fails
279 try:
280 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200281 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000282 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000283 pass
284
285 try:
286 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200287 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000288 except AttributeError:
289 pass
290
291 # Use the constructor with a too-short tuple.
292 try:
293 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200294 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 except TypeError:
296 pass
297
Ezio Melotti42da6632011-03-15 05:18:48 +0200298 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000299 try:
300 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
301 except TypeError:
302 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000303
Thomas Wouters89f507f2006-12-13 04:49:30 +0000304 def test_utime_dir(self):
305 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000306 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000307 # round to int, because some systems may support sub-second
308 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000309 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
310 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000311 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000312
Larry Hastings76ad59b2012-05-03 00:30:07 -0700313 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600314 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600315 # second argument. Check that the previous methods of passing
316 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700317 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600318 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700319 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
320 # Setting the time to the time you just read, then reading again,
321 # should always return exactly the same times.
322 st1 = os.stat(filename)
323 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
324 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600325 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700326 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600327 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700328 # Set to the current time in the new way
329 os.utime(filename)
330 st3 = os.stat(filename)
331 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
332
333 def test_utime(self):
334 def utime(file, times):
335 return os.utime(file, times)
336 self._test_utime(self.fname, getattr, utime, 10)
337 self._test_utime(support.TESTFN, getattr, utime, 10)
338
339
340 def _test_utime_ns(self, set_times_ns, test_dir=True):
341 def getattr_ns(o, attr):
342 return getattr(o, attr + "_ns")
343 ten_s = 10 * 1000 * 1000 * 1000
344 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
345 if test_dir:
346 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
347
348 def test_utime_ns(self):
349 def utime_ns(file, times):
350 return os.utime(file, ns=times)
351 self._test_utime_ns(utime_ns)
352
Larry Hastings9cf065c2012-06-22 16:30:09 -0700353 requires_utime_dir_fd = unittest.skipUnless(
354 os.utime in os.supports_dir_fd,
355 "dir_fd support for utime required for this test.")
356 requires_utime_fd = unittest.skipUnless(
357 os.utime in os.supports_fd,
358 "fd support for utime required for this test.")
359 requires_utime_nofollow_symlinks = unittest.skipUnless(
360 os.utime in os.supports_follow_symlinks,
361 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700362
Larry Hastings9cf065c2012-06-22 16:30:09 -0700363 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700364 def test_lutimes_ns(self):
365 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700366 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700367 self._test_utime_ns(lutimes_ns)
368
Larry Hastings9cf065c2012-06-22 16:30:09 -0700369 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700370 def test_futimes_ns(self):
371 def futimes_ns(file, times):
372 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700373 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700374 self._test_utime_ns(futimes_ns, test_dir=False)
375
376 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700377 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700378 getattr(os, name)(arg, (5, 5), ns=(5, 5))
379
380 def test_utime_invalid_arguments(self):
381 self._utime_invalid_arguments('utime', self.fname)
382
Brian Curtin52fbea12011-11-06 13:41:17 -0600383
Victor Stinner1aa54a42012-02-08 04:09:37 +0100384 @unittest.skipUnless(stat_supports_subsecond,
385 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100386 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100387 asec, amsec = 1, 901
388 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100389 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100390 mtime = msec + mmsec * 1e-3
391 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100392 os.utime(filename, (0, 0))
393 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200394 with warnings.catch_warnings():
395 warnings.simplefilter("ignore", DeprecationWarning)
396 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100397 st = os.stat(filename)
398 self.assertAlmostEqual(st.st_atime, atime, places=3)
399 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100400
Victor Stinnera2f7c002012-02-08 03:36:25 +0100401 def test_utime_subsecond(self):
402 def set_time(filename, atime, mtime):
403 os.utime(filename, (atime, mtime))
404 self._test_utime_subsecond(set_time)
405
Larry Hastings9cf065c2012-06-22 16:30:09 -0700406 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100407 def test_futimes_subsecond(self):
408 def set_time(filename, atime, mtime):
409 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700410 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100411 self._test_utime_subsecond(set_time)
412
Larry Hastings9cf065c2012-06-22 16:30:09 -0700413 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100414 def test_futimens_subsecond(self):
415 def set_time(filename, atime, mtime):
416 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700417 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100418 self._test_utime_subsecond(set_time)
419
Larry Hastings9cf065c2012-06-22 16:30:09 -0700420 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100421 def test_futimesat_subsecond(self):
422 def set_time(filename, atime, mtime):
423 dirname = os.path.dirname(filename)
424 dirfd = os.open(dirname, os.O_RDONLY)
425 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700426 os.utime(os.path.basename(filename), dir_fd=dirfd,
427 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100428 finally:
429 os.close(dirfd)
430 self._test_utime_subsecond(set_time)
431
Larry Hastings9cf065c2012-06-22 16:30:09 -0700432 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100433 def test_lutimes_subsecond(self):
434 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700435 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100436 self._test_utime_subsecond(set_time)
437
Larry Hastings9cf065c2012-06-22 16:30:09 -0700438 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100439 def test_utimensat_subsecond(self):
440 def set_time(filename, atime, mtime):
441 dirname = os.path.dirname(filename)
442 dirfd = os.open(dirname, os.O_RDONLY)
443 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700444 os.utime(os.path.basename(filename), dir_fd=dirfd,
445 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100446 finally:
447 os.close(dirfd)
448 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100449
Thomas Wouters89f507f2006-12-13 04:49:30 +0000450 # Restrict test to Win32, since there is no guarantee other
451 # systems support centiseconds
452 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000453 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000454 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000455 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000456 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000457 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000458 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000459 return buf.value
460
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000461 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000462 def test_1565150(self):
463 t1 = 1159195039.25
464 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000465 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000466
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000467 def test_large_time(self):
468 t1 = 5000000000 # some day in 2128
469 os.utime(self.fname, (t1, t1))
470 self.assertEqual(os.stat(self.fname).st_mtime, t1)
471
Guido van Rossumd8faa362007-04-27 19:54:29 +0000472 def test_1686475(self):
473 # Verify that an open file can be stat'ed
474 try:
475 os.stat(r"c:\pagefile.sys")
Andrew Svetlov2606a6f2012-12-19 14:33:35 +0200476 except FileNotFoundError:
477 pass # file does not exist; cannot run test
478 except OSError as e:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000479 self.fail("Could not stat pagefile.sys")
480
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100481 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
482 def test_15261(self):
483 # Verify that stat'ing a closed fd does not cause crash
484 r, w = os.pipe()
485 try:
486 os.stat(r) # should not raise error
487 finally:
488 os.close(r)
489 os.close(w)
490 with self.assertRaises(OSError) as ctx:
491 os.stat(r)
492 self.assertEqual(ctx.exception.errno, errno.EBADF)
493
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000494from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000495
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000496class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000497 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000498 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000499
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000500 def setUp(self):
501 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000502 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000503 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000504 for key, value in self._reference().items():
505 os.environ[key] = value
506
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000507 def tearDown(self):
508 os.environ.clear()
509 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000510 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000511 os.environb.clear()
512 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000513
Christian Heimes90333392007-11-01 19:08:42 +0000514 def _reference(self):
515 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
516
517 def _empty_mapping(self):
518 os.environ.clear()
519 return os.environ
520
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000521 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300522 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000523 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000524 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300525 os.environ.update(HELLO="World")
526 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
527 value = popen.read().strip()
528 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000529
Ezio Melottic7e139b2012-09-26 20:01:34 +0300530 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000531 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300532 with os.popen(
533 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
534 it = iter(popen)
535 self.assertEqual(next(it), "line1\n")
536 self.assertEqual(next(it), "line2\n")
537 self.assertEqual(next(it), "line3\n")
538 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000539
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000540 # Verify environ keys and values from the OS are of the
541 # correct str type.
542 def test_keyvalue_types(self):
543 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000544 self.assertEqual(type(key), str)
545 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000546
Christian Heimes90333392007-11-01 19:08:42 +0000547 def test_items(self):
548 for key, value in self._reference().items():
549 self.assertEqual(os.environ.get(key), value)
550
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000551 # Issue 7310
552 def test___repr__(self):
553 """Check that the repr() of os.environ looks like environ({...})."""
554 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000555 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
556 '{!r}: {!r}'.format(key, value)
557 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000558
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000559 def test_get_exec_path(self):
560 defpath_list = os.defpath.split(os.pathsep)
561 test_path = ['/monty', '/python', '', '/flying/circus']
562 test_env = {'PATH': os.pathsep.join(test_path)}
563
564 saved_environ = os.environ
565 try:
566 os.environ = dict(test_env)
567 # Test that defaulting to os.environ works.
568 self.assertSequenceEqual(test_path, os.get_exec_path())
569 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
570 finally:
571 os.environ = saved_environ
572
573 # No PATH environment variable
574 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
575 # Empty PATH environment variable
576 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
577 # Supplied PATH environment variable
578 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
579
Victor Stinnerb745a742010-05-18 17:17:23 +0000580 if os.supports_bytes_environ:
581 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000582 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000583 # ignore BytesWarning warning
584 with warnings.catch_warnings(record=True):
585 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000586 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000587 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000588 pass
589 else:
590 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000591
592 # bytes key and/or value
593 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
594 ['abc'])
595 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
596 ['abc'])
597 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
598 ['abc'])
599
600 @unittest.skipUnless(os.supports_bytes_environ,
601 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000602 def test_environb(self):
603 # os.environ -> os.environb
604 value = 'euro\u20ac'
605 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000606 value_bytes = value.encode(sys.getfilesystemencoding(),
607 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000608 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000609 msg = "U+20AC character is not encodable to %s" % (
610 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000611 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000612 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000613 self.assertEqual(os.environ['unicode'], value)
614 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000615
616 # os.environb -> os.environ
617 value = b'\xff'
618 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000619 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000620 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000621 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000622
Charles-François Natali2966f102011-11-26 11:32:46 +0100623 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
624 # #13415).
625 @support.requires_freebsd_version(7)
626 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100627 def test_unset_error(self):
628 if sys.platform == "win32":
629 # an environment variable is limited to 32,767 characters
630 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100631 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100632 else:
633 # "=" is not allowed in a variable name
634 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100635 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100636
Victor Stinner6d101392013-04-14 16:35:04 +0200637 def test_key_type(self):
638 missing = 'missingkey'
639 self.assertNotIn(missing, os.environ)
640
Victor Stinner839e5ea2013-04-14 16:43:03 +0200641 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200642 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200643 self.assertIs(cm.exception.args[0], missing)
Victor Stinner6d101392013-04-14 16:35:04 +0200644
Victor Stinner839e5ea2013-04-14 16:43:03 +0200645 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200646 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200647 self.assertIs(cm.exception.args[0], missing)
Victor Stinner6d101392013-04-14 16:35:04 +0200648
Tim Petersc4e09402003-04-25 07:11:48 +0000649class WalkTests(unittest.TestCase):
650 """Tests for os.walk()."""
651
Charles-François Natali7372b062012-02-05 15:15:38 +0100652 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000653 import os
654 from os.path import join
655
656 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000657 # TESTFN/
658 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000659 # tmp1
660 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000661 # tmp2
662 # SUB11/ no kids
663 # SUB2/ a file kid and a dirsymlink kid
664 # tmp3
665 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200666 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000667 # TEST2/
668 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000669 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000670 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000671 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000672 sub2_path = join(walk_path, "SUB2")
673 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000674 tmp2_path = join(sub1_path, "tmp2")
675 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000676 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000677 t2_path = join(support.TESTFN, "TEST2")
678 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200679 link_path = join(sub2_path, "link")
680 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000681
682 # Create stuff.
683 os.makedirs(sub11_path)
684 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000685 os.makedirs(t2_path)
686 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000687 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000688 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
689 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000690 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400691 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400692 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200693 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000694 else:
695 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000696
697 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000698 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000699 self.assertEqual(len(all), 4)
700 # We can't know which order SUB1 and SUB2 will appear in.
701 # Not flipped: TESTFN, SUB1, SUB11, SUB2
702 # flipped: TESTFN, SUB2, SUB1, SUB11
703 flipped = all[0][1][0] != "SUB1"
704 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200705 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000706 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000707 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
708 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000709 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000710
711 # Prune the search.
712 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000713 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000714 all.append((root, dirs, files))
715 # Don't descend into SUB1.
716 if 'SUB1' in dirs:
717 # Note that this also mutates the dirs we appended to all!
718 dirs.remove('SUB1')
719 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000720 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200721 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000722 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000723
724 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000725 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000726 self.assertEqual(len(all), 4)
727 # We can't know which order SUB1 and SUB2 will appear in.
728 # Not flipped: SUB11, SUB1, SUB2, TESTFN
729 # flipped: SUB2, SUB11, SUB1, TESTFN
730 flipped = all[3][1][0] != "SUB1"
731 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200732 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000733 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000734 self.assertEqual(all[flipped], (sub11_path, [], []))
735 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000736 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000737
Brian Curtin3b4499c2010-12-28 14:31:47 +0000738 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000739 # Walk, following symlinks.
740 for root, dirs, files in os.walk(walk_path, followlinks=True):
741 if root == link_path:
742 self.assertEqual(dirs, [])
743 self.assertEqual(files, ["tmp4"])
744 break
745 else:
746 self.fail("Didn't follow symlink with followlinks=True")
747
748 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000749 # Tear everything down. This is a decent use for bottom-up on
750 # Windows, which doesn't have a recursive delete command. The
751 # (not so) subtlety is that rmdir will fail unless the dir's
752 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000753 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000754 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000755 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000756 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000757 dirname = os.path.join(root, name)
758 if not os.path.islink(dirname):
759 os.rmdir(dirname)
760 else:
761 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000762 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000763
Charles-François Natali7372b062012-02-05 15:15:38 +0100764
765@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
766class FwalkTests(WalkTests):
767 """Tests for os.fwalk()."""
768
Larry Hastingsc48fe982012-06-25 04:49:05 -0700769 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
770 """
771 compare with walk() results.
772 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700773 walk_kwargs = walk_kwargs.copy()
774 fwalk_kwargs = fwalk_kwargs.copy()
775 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
776 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
777 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700778
Charles-François Natali7372b062012-02-05 15:15:38 +0100779 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700780 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100781 expected[root] = (set(dirs), set(files))
782
Larry Hastingsc48fe982012-06-25 04:49:05 -0700783 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100784 self.assertIn(root, expected)
785 self.assertEqual(expected[root], (set(dirs), set(files)))
786
Larry Hastingsc48fe982012-06-25 04:49:05 -0700787 def test_compare_to_walk(self):
788 kwargs = {'top': support.TESTFN}
789 self._compare_to_walk(kwargs, kwargs)
790
Charles-François Natali7372b062012-02-05 15:15:38 +0100791 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700792 try:
793 fd = os.open(".", os.O_RDONLY)
794 walk_kwargs = {'top': support.TESTFN}
795 fwalk_kwargs = walk_kwargs.copy()
796 fwalk_kwargs['dir_fd'] = fd
797 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
798 finally:
799 os.close(fd)
800
801 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100802 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700803 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
804 args = support.TESTFN, topdown, None
805 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100806 # check that the FD is valid
807 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700808 # redundant check
809 os.stat(rootfd)
810 # check that listdir() returns consistent information
811 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100812
813 def test_fd_leak(self):
814 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
815 # we both check that calling fwalk() a large number of times doesn't
816 # yield EMFILE, and that the minimum allocated FD hasn't changed.
817 minfd = os.dup(1)
818 os.close(minfd)
819 for i in range(256):
820 for x in os.fwalk(support.TESTFN):
821 pass
822 newfd = os.dup(1)
823 self.addCleanup(os.close, newfd)
824 self.assertEqual(newfd, minfd)
825
826 def tearDown(self):
827 # cleanup
828 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
829 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700830 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100831 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700832 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700833 if stat.S_ISDIR(st.st_mode):
834 os.rmdir(name, dir_fd=rootfd)
835 else:
836 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100837 os.rmdir(support.TESTFN)
838
839
Guido van Rossume7ba4952007-06-06 23:52:48 +0000840class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000841 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000842 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000843
844 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000845 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000846 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
847 os.makedirs(path) # Should work
848 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
849 os.makedirs(path)
850
851 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000852 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000853 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
854 os.makedirs(path)
855 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
856 'dir5', 'dir6')
857 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000858
Terry Reedy5a22b652010-12-02 07:05:56 +0000859 def test_exist_ok_existing_directory(self):
860 path = os.path.join(support.TESTFN, 'dir1')
861 mode = 0o777
862 old_mask = os.umask(0o022)
863 os.makedirs(path, mode)
864 self.assertRaises(OSError, os.makedirs, path, mode)
865 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
866 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
867 os.makedirs(path, mode=mode, exist_ok=True)
868 os.umask(old_mask)
869
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400870 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700871 def test_chown_uid_gid_arguments_must_be_index(self):
872 stat = os.stat(support.TESTFN)
873 uid = stat.st_uid
874 gid = stat.st_gid
875 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
876 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
877 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
878 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
879 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
880
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700881 def test_exist_ok_s_isgid_directory(self):
882 path = os.path.join(support.TESTFN, 'dir1')
883 S_ISGID = stat.S_ISGID
884 mode = 0o777
885 old_mask = os.umask(0o022)
886 try:
887 existing_testfn_mode = stat.S_IMODE(
888 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700889 try:
890 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700891 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700892 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700893 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
894 raise unittest.SkipTest('No support for S_ISGID dir mode.')
895 # The os should apply S_ISGID from the parent dir for us, but
896 # this test need not depend on that behavior. Be explicit.
897 os.makedirs(path, mode | S_ISGID)
898 # http://bugs.python.org/issue14992
899 # Should not fail when the bit is already set.
900 os.makedirs(path, mode, exist_ok=True)
901 # remove the bit.
902 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
903 with self.assertRaises(OSError):
904 # Should fail when the bit is not already set when demanded.
905 os.makedirs(path, mode | S_ISGID, exist_ok=True)
906 finally:
907 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000908
909 def test_exist_ok_existing_regular_file(self):
910 base = support.TESTFN
911 path = os.path.join(support.TESTFN, 'dir1')
912 f = open(path, 'w')
913 f.write('abc')
914 f.close()
915 self.assertRaises(OSError, os.makedirs, path)
916 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
917 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
918 os.remove(path)
919
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000920 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000921 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000922 'dir4', 'dir5', 'dir6')
923 # If the tests failed, the bottom-most directory ('../dir6')
924 # may not have been created, so we look for the outermost directory
925 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000926 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000927 path = os.path.dirname(path)
928
929 os.removedirs(path)
930
Andrew Svetlov405faed2012-12-25 12:18:09 +0200931
932class RemoveDirsTests(unittest.TestCase):
933 def setUp(self):
934 os.makedirs(support.TESTFN)
935
936 def tearDown(self):
937 support.rmtree(support.TESTFN)
938
939 def test_remove_all(self):
940 dira = os.path.join(support.TESTFN, 'dira')
941 os.mkdir(dira)
942 dirb = os.path.join(dira, 'dirb')
943 os.mkdir(dirb)
944 os.removedirs(dirb)
945 self.assertFalse(os.path.exists(dirb))
946 self.assertFalse(os.path.exists(dira))
947 self.assertFalse(os.path.exists(support.TESTFN))
948
949 def test_remove_partial(self):
950 dira = os.path.join(support.TESTFN, 'dira')
951 os.mkdir(dira)
952 dirb = os.path.join(dira, 'dirb')
953 os.mkdir(dirb)
954 with open(os.path.join(dira, 'file.txt'), 'w') as f:
955 f.write('text')
956 os.removedirs(dirb)
957 self.assertFalse(os.path.exists(dirb))
958 self.assertTrue(os.path.exists(dira))
959 self.assertTrue(os.path.exists(support.TESTFN))
960
961 def test_remove_nothing(self):
962 dira = os.path.join(support.TESTFN, 'dira')
963 os.mkdir(dira)
964 dirb = os.path.join(dira, 'dirb')
965 os.mkdir(dirb)
966 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
967 f.write('text')
968 with self.assertRaises(OSError):
969 os.removedirs(dirb)
970 self.assertTrue(os.path.exists(dirb))
971 self.assertTrue(os.path.exists(dira))
972 self.assertTrue(os.path.exists(support.TESTFN))
973
974
Guido van Rossume7ba4952007-06-06 23:52:48 +0000975class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000976 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200977 with open(os.devnull, 'wb') as f:
978 f.write(b'hello')
979 f.close()
980 with open(os.devnull, 'rb') as f:
981 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000982
Andrew Svetlov405faed2012-12-25 12:18:09 +0200983
Guido van Rossume7ba4952007-06-06 23:52:48 +0000984class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100985 def test_urandom_length(self):
986 self.assertEqual(len(os.urandom(0)), 0)
987 self.assertEqual(len(os.urandom(1)), 1)
988 self.assertEqual(len(os.urandom(10)), 10)
989 self.assertEqual(len(os.urandom(100)), 100)
990 self.assertEqual(len(os.urandom(1000)), 1000)
991
992 def test_urandom_value(self):
993 data1 = os.urandom(16)
994 data2 = os.urandom(16)
995 self.assertNotEqual(data1, data2)
996
997 def get_urandom_subprocess(self, count):
998 code = '\n'.join((
999 'import os, sys',
1000 'data = os.urandom(%s)' % count,
1001 'sys.stdout.buffer.write(data)',
1002 'sys.stdout.buffer.flush()'))
1003 out = assert_python_ok('-c', code)
1004 stdout = out[1]
1005 self.assertEqual(len(stdout), 16)
1006 return stdout
1007
1008 def test_urandom_subprocess(self):
1009 data1 = self.get_urandom_subprocess(16)
1010 data2 = self.get_urandom_subprocess(16)
1011 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001012
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001013@contextlib.contextmanager
1014def _execvpe_mockup(defpath=None):
1015 """
1016 Stubs out execv and execve functions when used as context manager.
1017 Records exec calls. The mock execv and execve functions always raise an
1018 exception as they would normally never return.
1019 """
1020 # A list of tuples containing (function name, first arg, args)
1021 # of calls to execv or execve that have been made.
1022 calls = []
1023
1024 def mock_execv(name, *args):
1025 calls.append(('execv', name, args))
1026 raise RuntimeError("execv called")
1027
1028 def mock_execve(name, *args):
1029 calls.append(('execve', name, args))
1030 raise OSError(errno.ENOTDIR, "execve called")
1031
1032 try:
1033 orig_execv = os.execv
1034 orig_execve = os.execve
1035 orig_defpath = os.defpath
1036 os.execv = mock_execv
1037 os.execve = mock_execve
1038 if defpath is not None:
1039 os.defpath = defpath
1040 yield calls
1041 finally:
1042 os.execv = orig_execv
1043 os.execve = orig_execve
1044 os.defpath = orig_defpath
1045
Guido van Rossume7ba4952007-06-06 23:52:48 +00001046class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001047 @unittest.skipIf(USING_LINUXTHREADS,
1048 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001049 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001050 self.assertRaises(OSError, os.execvpe, 'no such app-',
1051 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001052
Thomas Heller6790d602007-08-30 17:15:14 +00001053 def test_execvpe_with_bad_arglist(self):
1054 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1055
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001056 @unittest.skipUnless(hasattr(os, '_execvpe'),
1057 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001058 def _test_internal_execvpe(self, test_type):
1059 program_path = os.sep + 'absolutepath'
1060 if test_type is bytes:
1061 program = b'executable'
1062 fullpath = os.path.join(os.fsencode(program_path), program)
1063 native_fullpath = fullpath
1064 arguments = [b'progname', 'arg1', 'arg2']
1065 else:
1066 program = 'executable'
1067 arguments = ['progname', 'arg1', 'arg2']
1068 fullpath = os.path.join(program_path, program)
1069 if os.name != "nt":
1070 native_fullpath = os.fsencode(fullpath)
1071 else:
1072 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001073 env = {'spam': 'beans'}
1074
Victor Stinnerb745a742010-05-18 17:17:23 +00001075 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001076 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001077 self.assertRaises(RuntimeError,
1078 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001079 self.assertEqual(len(calls), 1)
1080 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1081
Victor Stinnerb745a742010-05-18 17:17:23 +00001082 # test os._execvpe() with a relative path:
1083 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001084 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001085 self.assertRaises(OSError,
1086 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001087 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001088 self.assertSequenceEqual(calls[0],
1089 ('execve', native_fullpath, (arguments, env)))
1090
1091 # test os._execvpe() with a relative path:
1092 # os.get_exec_path() reads the 'PATH' variable
1093 with _execvpe_mockup() as calls:
1094 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001095 if test_type is bytes:
1096 env_path[b'PATH'] = program_path
1097 else:
1098 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001099 self.assertRaises(OSError,
1100 os._execvpe, program, arguments, env=env_path)
1101 self.assertEqual(len(calls), 1)
1102 self.assertSequenceEqual(calls[0],
1103 ('execve', native_fullpath, (arguments, env_path)))
1104
1105 def test_internal_execvpe_str(self):
1106 self._test_internal_execvpe(str)
1107 if os.name != "nt":
1108 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001109
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001110
Thomas Wouters477c8d52006-05-27 19:21:47 +00001111class Win32ErrorTests(unittest.TestCase):
1112 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001113 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001114
1115 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001116 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001117
1118 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001119 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001120
1121 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001122 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001123 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001124 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001125 finally:
1126 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001127 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001128
1129 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001130 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001131
Thomas Wouters477c8d52006-05-27 19:21:47 +00001132 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001133 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001134
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001135class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001136 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001137 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1138 #singles.append("close")
1139 #We omit close because it doesn'r raise an exception on some platforms
1140 def get_single(f):
1141 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001142 if hasattr(os, f):
1143 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001144 return helper
1145 for f in singles:
1146 locals()["test_"+f] = get_single(f)
1147
Benjamin Peterson7522c742009-01-19 21:00:09 +00001148 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001149 try:
1150 f(support.make_bad_fd(), *args)
1151 except OSError as e:
1152 self.assertEqual(e.errno, errno.EBADF)
1153 else:
1154 self.fail("%r didn't raise a OSError with a bad file descriptor"
1155 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001156
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001157 def test_isatty(self):
1158 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001159 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001160
1161 def test_closerange(self):
1162 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001163 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +00001164 # Make sure none of the descriptors we are about to close are
1165 # currently valid (issue 6542).
1166 for i in range(10):
1167 try: os.fstat(fd+i)
1168 except OSError:
1169 pass
1170 else:
1171 break
1172 if i < 2:
1173 raise unittest.SkipTest(
1174 "Unable to acquire a range of invalid file descriptors")
1175 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001176
1177 def test_dup2(self):
1178 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001179 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001180
1181 def test_fchmod(self):
1182 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001183 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001184
1185 def test_fchown(self):
1186 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001187 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001188
1189 def test_fpathconf(self):
1190 if hasattr(os, "fpathconf"):
Georg Brandl306336b2012-06-24 12:55:33 +02001191 self.check(os.pathconf, "PC_NAME_MAX")
Benjamin Peterson7522c742009-01-19 21:00:09 +00001192 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001193
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001194 def test_ftruncate(self):
1195 if hasattr(os, "ftruncate"):
Georg Brandl306336b2012-06-24 12:55:33 +02001196 self.check(os.truncate, 0)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001197 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001198
1199 def test_lseek(self):
1200 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001201 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001202
1203 def test_read(self):
1204 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001205 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001206
1207 def test_tcsetpgrpt(self):
1208 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001209 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001210
1211 def test_write(self):
1212 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001213 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001214
Brian Curtin1b9df392010-11-24 20:24:31 +00001215
1216class LinkTests(unittest.TestCase):
1217 def setUp(self):
1218 self.file1 = support.TESTFN
1219 self.file2 = os.path.join(support.TESTFN + "2")
1220
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001221 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001222 for file in (self.file1, self.file2):
1223 if os.path.exists(file):
1224 os.unlink(file)
1225
Brian Curtin1b9df392010-11-24 20:24:31 +00001226 def _test_link(self, file1, file2):
1227 with open(file1, "w") as f1:
1228 f1.write("test")
1229
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001230 with warnings.catch_warnings():
1231 warnings.simplefilter("ignore", DeprecationWarning)
1232 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001233 with open(file1, "r") as f1, open(file2, "r") as f2:
1234 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1235
1236 def test_link(self):
1237 self._test_link(self.file1, self.file2)
1238
1239 def test_link_bytes(self):
1240 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1241 bytes(self.file2, sys.getfilesystemencoding()))
1242
Brian Curtinf498b752010-11-30 15:54:04 +00001243 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001244 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001245 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001246 except UnicodeError:
1247 raise unittest.SkipTest("Unable to encode for this platform.")
1248
Brian Curtinf498b752010-11-30 15:54:04 +00001249 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001250 self.file2 = self.file1 + "2"
1251 self._test_link(self.file1, self.file2)
1252
Thomas Wouters477c8d52006-05-27 19:21:47 +00001253if sys.platform != 'win32':
1254 class Win32ErrorTests(unittest.TestCase):
1255 pass
1256
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001257 class PosixUidGidTests(unittest.TestCase):
1258 if hasattr(os, 'setuid'):
1259 def test_setuid(self):
1260 if os.getuid() != 0:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001261 self.assertRaises(OSError, os.setuid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001262 self.assertRaises(OverflowError, os.setuid, 1<<32)
1263
1264 if hasattr(os, 'setgid'):
1265 def test_setgid(self):
Stefan Krahebee49a2013-01-17 15:31:00 +01001266 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001267 self.assertRaises(OSError, os.setgid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001268 self.assertRaises(OverflowError, os.setgid, 1<<32)
1269
1270 if hasattr(os, 'seteuid'):
1271 def test_seteuid(self):
1272 if os.getuid() != 0:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001273 self.assertRaises(OSError, os.seteuid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001274 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1275
1276 if hasattr(os, 'setegid'):
1277 def test_setegid(self):
Stefan Krahebee49a2013-01-17 15:31:00 +01001278 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001279 self.assertRaises(OSError, os.setegid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001280 self.assertRaises(OverflowError, os.setegid, 1<<32)
1281
1282 if hasattr(os, 'setreuid'):
1283 def test_setreuid(self):
1284 if os.getuid() != 0:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001285 self.assertRaises(OSError, os.setreuid, 0, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001286 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1287 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001288
1289 def test_setreuid_neg1(self):
1290 # Needs to accept -1. We run this in a subprocess to avoid
1291 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001292 subprocess.check_call([
1293 sys.executable, '-c',
1294 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001295
1296 if hasattr(os, 'setregid'):
1297 def test_setregid(self):
Stefan Krahebee49a2013-01-17 15:31:00 +01001298 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001299 self.assertRaises(OSError, os.setregid, 0, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001300 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1301 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001302
1303 def test_setregid_neg1(self):
1304 # Needs to accept -1. We run this in a subprocess to avoid
1305 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001306 subprocess.check_call([
1307 sys.executable, '-c',
1308 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001309
1310 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001311 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001312 if support.TESTFN_UNENCODABLE:
1313 self.dir = support.TESTFN_UNENCODABLE
Victor Stinner8b219b22012-11-06 23:23:43 +01001314 elif support.TESTFN_NONASCII:
1315 self.dir = support.TESTFN_NONASCII
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001316 else:
1317 self.dir = support.TESTFN
1318 self.bdir = os.fsencode(self.dir)
1319
1320 bytesfn = []
1321 def add_filename(fn):
1322 try:
1323 fn = os.fsencode(fn)
1324 except UnicodeEncodeError:
1325 return
1326 bytesfn.append(fn)
1327 add_filename(support.TESTFN_UNICODE)
1328 if support.TESTFN_UNENCODABLE:
1329 add_filename(support.TESTFN_UNENCODABLE)
Victor Stinner8b219b22012-11-06 23:23:43 +01001330 if support.TESTFN_NONASCII:
1331 add_filename(support.TESTFN_NONASCII)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001332 if not bytesfn:
1333 self.skipTest("couldn't create any non-ascii filename")
1334
1335 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001336 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001337 try:
1338 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001339 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001340 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001341 if fn in self.unicodefn:
1342 raise ValueError("duplicate filename")
1343 self.unicodefn.add(fn)
1344 except:
1345 shutil.rmtree(self.dir)
1346 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001347
1348 def tearDown(self):
1349 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001350
1351 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001352 expected = self.unicodefn
1353 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001354 self.assertEqual(found, expected)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001355 # test listdir without arguments
1356 current_directory = os.getcwd()
1357 try:
1358 os.chdir(os.sep)
1359 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1360 finally:
1361 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001362
1363 def test_open(self):
1364 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001365 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001366 f.close()
1367
Victor Stinnere4110dc2013-01-01 23:05:55 +01001368 @unittest.skipUnless(hasattr(os, 'statvfs'),
1369 "need os.statvfs()")
1370 def test_statvfs(self):
1371 # issue #9645
1372 for fn in self.unicodefn:
1373 # should not fail with file not found error
1374 fullname = os.path.join(self.dir, fn)
1375 os.statvfs(fullname)
1376
Martin v. Löwis011e8422009-05-05 04:43:17 +00001377 def test_stat(self):
1378 for fn in self.unicodefn:
1379 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001380else:
1381 class PosixUidGidTests(unittest.TestCase):
1382 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001383 class Pep383Tests(unittest.TestCase):
1384 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001385
Brian Curtineb24d742010-04-12 17:16:38 +00001386@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1387class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001388 def _kill(self, sig):
1389 # Start sys.executable as a subprocess and communicate from the
1390 # subprocess to the parent that the interpreter is ready. When it
1391 # becomes ready, send *sig* via os.kill to the subprocess and check
1392 # that the return code is equal to *sig*.
1393 import ctypes
1394 from ctypes import wintypes
1395 import msvcrt
1396
1397 # Since we can't access the contents of the process' stdout until the
1398 # process has exited, use PeekNamedPipe to see what's inside stdout
1399 # without waiting. This is done so we can tell that the interpreter
1400 # is started and running at a point where it could handle a signal.
1401 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1402 PeekNamedPipe.restype = wintypes.BOOL
1403 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1404 ctypes.POINTER(ctypes.c_char), # stdout buf
1405 wintypes.DWORD, # Buffer size
1406 ctypes.POINTER(wintypes.DWORD), # bytes read
1407 ctypes.POINTER(wintypes.DWORD), # bytes avail
1408 ctypes.POINTER(wintypes.DWORD)) # bytes left
1409 msg = "running"
1410 proc = subprocess.Popen([sys.executable, "-c",
1411 "import sys;"
1412 "sys.stdout.write('{}');"
1413 "sys.stdout.flush();"
1414 "input()".format(msg)],
1415 stdout=subprocess.PIPE,
1416 stderr=subprocess.PIPE,
1417 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001418 self.addCleanup(proc.stdout.close)
1419 self.addCleanup(proc.stderr.close)
1420 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001421
1422 count, max = 0, 100
1423 while count < max and proc.poll() is None:
1424 # Create a string buffer to store the result of stdout from the pipe
1425 buf = ctypes.create_string_buffer(len(msg))
1426 # Obtain the text currently in proc.stdout
1427 # Bytes read/avail/left are left as NULL and unused
1428 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1429 buf, ctypes.sizeof(buf), None, None, None)
1430 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1431 if buf.value:
1432 self.assertEqual(msg, buf.value.decode())
1433 break
1434 time.sleep(0.1)
1435 count += 1
1436 else:
1437 self.fail("Did not receive communication from the subprocess")
1438
Brian Curtineb24d742010-04-12 17:16:38 +00001439 os.kill(proc.pid, sig)
1440 self.assertEqual(proc.wait(), sig)
1441
1442 def test_kill_sigterm(self):
1443 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001444 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001445
1446 def test_kill_int(self):
1447 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001448 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001449
1450 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001451 tagname = "test_os_%s" % uuid.uuid1()
1452 m = mmap.mmap(-1, 1, tagname)
1453 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001454 # Run a script which has console control handling enabled.
1455 proc = subprocess.Popen([sys.executable,
1456 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001457 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001458 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1459 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001460 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001461 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001462 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001463 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001464 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001465 count += 1
1466 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001467 # Forcefully kill the process if we weren't able to signal it.
1468 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001469 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001470 os.kill(proc.pid, event)
1471 # proc.send_signal(event) could also be done here.
1472 # Allow time for the signal to be passed and the process to exit.
1473 time.sleep(0.5)
1474 if not proc.poll():
1475 # Forcefully kill the process if we weren't able to signal it.
1476 os.kill(proc.pid, signal.SIGINT)
1477 self.fail("subprocess did not stop on {}".format(name))
1478
1479 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1480 def test_CTRL_C_EVENT(self):
1481 from ctypes import wintypes
1482 import ctypes
1483
1484 # Make a NULL value by creating a pointer with no argument.
1485 NULL = ctypes.POINTER(ctypes.c_int)()
1486 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1487 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1488 wintypes.BOOL)
1489 SetConsoleCtrlHandler.restype = wintypes.BOOL
1490
1491 # Calling this with NULL and FALSE causes the calling process to
1492 # handle CTRL+C, rather than ignore it. This property is inherited
1493 # by subprocesses.
1494 SetConsoleCtrlHandler(NULL, 0)
1495
1496 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1497
1498 def test_CTRL_BREAK_EVENT(self):
1499 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1500
1501
Brian Curtind40e6f72010-07-08 21:39:08 +00001502@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001503@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001504class Win32SymlinkTests(unittest.TestCase):
1505 filelink = 'filelinktest'
1506 filelink_target = os.path.abspath(__file__)
1507 dirlink = 'dirlinktest'
1508 dirlink_target = os.path.dirname(filelink_target)
1509 missing_link = 'missing link'
1510
1511 def setUp(self):
1512 assert os.path.exists(self.dirlink_target)
1513 assert os.path.exists(self.filelink_target)
1514 assert not os.path.exists(self.dirlink)
1515 assert not os.path.exists(self.filelink)
1516 assert not os.path.exists(self.missing_link)
1517
1518 def tearDown(self):
1519 if os.path.exists(self.filelink):
1520 os.remove(self.filelink)
1521 if os.path.exists(self.dirlink):
1522 os.rmdir(self.dirlink)
1523 if os.path.lexists(self.missing_link):
1524 os.remove(self.missing_link)
1525
1526 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001527 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001528 self.assertTrue(os.path.exists(self.dirlink))
1529 self.assertTrue(os.path.isdir(self.dirlink))
1530 self.assertTrue(os.path.islink(self.dirlink))
1531 self.check_stat(self.dirlink, self.dirlink_target)
1532
1533 def test_file_link(self):
1534 os.symlink(self.filelink_target, self.filelink)
1535 self.assertTrue(os.path.exists(self.filelink))
1536 self.assertTrue(os.path.isfile(self.filelink))
1537 self.assertTrue(os.path.islink(self.filelink))
1538 self.check_stat(self.filelink, self.filelink_target)
1539
1540 def _create_missing_dir_link(self):
1541 'Create a "directory" link to a non-existent target'
1542 linkname = self.missing_link
1543 if os.path.lexists(linkname):
1544 os.remove(linkname)
1545 target = r'c:\\target does not exist.29r3c740'
1546 assert not os.path.exists(target)
1547 target_is_dir = True
1548 os.symlink(target, linkname, target_is_dir)
1549
1550 def test_remove_directory_link_to_missing_target(self):
1551 self._create_missing_dir_link()
1552 # For compatibility with Unix, os.remove will check the
1553 # directory status and call RemoveDirectory if the symlink
1554 # was created with target_is_dir==True.
1555 os.remove(self.missing_link)
1556
1557 @unittest.skip("currently fails; consider for improvement")
1558 def test_isdir_on_directory_link_to_missing_target(self):
1559 self._create_missing_dir_link()
1560 # consider having isdir return true for directory links
1561 self.assertTrue(os.path.isdir(self.missing_link))
1562
1563 @unittest.skip("currently fails; consider for improvement")
1564 def test_rmdir_on_directory_link_to_missing_target(self):
1565 self._create_missing_dir_link()
1566 # consider allowing rmdir to remove directory links
1567 os.rmdir(self.missing_link)
1568
1569 def check_stat(self, link, target):
1570 self.assertEqual(os.stat(link), os.stat(target))
1571 self.assertNotEqual(os.lstat(link), os.stat(link))
1572
Brian Curtind25aef52011-06-13 15:16:04 -05001573 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001574 with warnings.catch_warnings():
1575 warnings.simplefilter("ignore", DeprecationWarning)
1576 self.assertEqual(os.stat(bytes_link), os.stat(target))
1577 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001578
1579 def test_12084(self):
1580 level1 = os.path.abspath(support.TESTFN)
1581 level2 = os.path.join(level1, "level2")
1582 level3 = os.path.join(level2, "level3")
1583 try:
1584 os.mkdir(level1)
1585 os.mkdir(level2)
1586 os.mkdir(level3)
1587
1588 file1 = os.path.abspath(os.path.join(level1, "file1"))
1589
1590 with open(file1, "w") as f:
1591 f.write("file1")
1592
1593 orig_dir = os.getcwd()
1594 try:
1595 os.chdir(level2)
1596 link = os.path.join(level2, "link")
1597 os.symlink(os.path.relpath(file1), "link")
1598 self.assertIn("link", os.listdir(os.getcwd()))
1599
1600 # Check os.stat calls from the same dir as the link
1601 self.assertEqual(os.stat(file1), os.stat("link"))
1602
1603 # Check os.stat calls from a dir below the link
1604 os.chdir(level1)
1605 self.assertEqual(os.stat(file1),
1606 os.stat(os.path.relpath(link)))
1607
1608 # Check os.stat calls from a dir above the link
1609 os.chdir(level3)
1610 self.assertEqual(os.stat(file1),
1611 os.stat(os.path.relpath(link)))
1612 finally:
1613 os.chdir(orig_dir)
1614 except OSError as err:
1615 self.fail(err)
1616 finally:
1617 os.remove(file1)
1618 shutil.rmtree(level1)
1619
Brian Curtind40e6f72010-07-08 21:39:08 +00001620
Jason R. Coombs3a092862013-05-27 23:21:28 -04001621@support.skip_unless_symlink
1622class NonLocalSymlinkTests(unittest.TestCase):
1623
1624 def setUp(self):
1625 """
1626 Create this structure:
1627
1628 base
1629 \___ some_dir
1630 """
1631 os.makedirs('base/some_dir')
1632
1633 def tearDown(self):
1634 shutil.rmtree('base')
1635
1636 def test_directory_link_nonlocal(self):
1637 """
1638 The symlink target should resolve relative to the link, not relative
1639 to the current directory.
1640
1641 Then, link base/some_link -> base/some_dir and ensure that some_link
1642 is resolved as a directory.
1643
1644 In issue13772, it was discovered that directory detection failed if
1645 the symlink target was not specified relative to the current
1646 directory, which was a defect in the implementation.
1647 """
1648 src = os.path.join('base', 'some_link')
1649 os.symlink('some_dir', src)
1650 assert os.path.isdir(src)
1651
1652
Victor Stinnere8d51452010-08-19 01:05:19 +00001653class FSEncodingTests(unittest.TestCase):
1654 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001655 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1656 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001657
Victor Stinnere8d51452010-08-19 01:05:19 +00001658 def test_identity(self):
1659 # assert fsdecode(fsencode(x)) == x
1660 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1661 try:
1662 bytesfn = os.fsencode(fn)
1663 except UnicodeEncodeError:
1664 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001665 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001666
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001667
Brett Cannonefb00c02012-02-29 18:31:31 -05001668
1669class DeviceEncodingTests(unittest.TestCase):
1670
1671 def test_bad_fd(self):
1672 # Return None when an fd doesn't actually exist.
1673 self.assertIsNone(os.device_encoding(123456))
1674
Philip Jenveye308b7c2012-02-29 16:16:15 -08001675 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1676 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001677 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001678 def test_device_encoding(self):
1679 encoding = os.device_encoding(0)
1680 self.assertIsNotNone(encoding)
1681 self.assertTrue(codecs.lookup(encoding))
1682
1683
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001684class PidTests(unittest.TestCase):
1685 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1686 def test_getppid(self):
1687 p = subprocess.Popen([sys.executable, '-c',
1688 'import os; print(os.getppid())'],
1689 stdout=subprocess.PIPE)
1690 stdout, _ = p.communicate()
1691 # We are the parent of our subprocess
1692 self.assertEqual(int(stdout), os.getpid())
1693
1694
Brian Curtin0151b8e2010-09-24 13:43:43 +00001695# The introduction of this TestCase caused at least two different errors on
1696# *nix buildbots. Temporarily skip this to let the buildbots move along.
1697@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001698@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1699class LoginTests(unittest.TestCase):
1700 def test_getlogin(self):
1701 user_name = os.getlogin()
1702 self.assertNotEqual(len(user_name), 0)
1703
1704
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001705@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1706 "needs os.getpriority and os.setpriority")
1707class ProgramPriorityTests(unittest.TestCase):
1708 """Tests for os.getpriority() and os.setpriority()."""
1709
1710 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001711
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001712 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1713 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1714 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001715 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1716 if base >= 19 and new_prio <= 19:
1717 raise unittest.SkipTest(
1718 "unable to reliably test setpriority at current nice level of %s" % base)
1719 else:
1720 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001721 finally:
1722 try:
1723 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1724 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001725 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001726 raise
1727
1728
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001729if threading is not None:
1730 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001731
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001732 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001733
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001734 def __init__(self, conn):
1735 asynchat.async_chat.__init__(self, conn)
1736 self.in_buffer = []
1737 self.closed = False
1738 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001739
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001740 def handle_read(self):
1741 data = self.recv(4096)
1742 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001743
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001744 def get_data(self):
1745 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001746
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001747 def handle_close(self):
1748 self.close()
1749 self.closed = True
1750
1751 def handle_error(self):
1752 raise
1753
1754 def __init__(self, address):
1755 threading.Thread.__init__(self)
1756 asyncore.dispatcher.__init__(self)
1757 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1758 self.bind(address)
1759 self.listen(5)
1760 self.host, self.port = self.socket.getsockname()[:2]
1761 self.handler_instance = None
1762 self._active = False
1763 self._active_lock = threading.Lock()
1764
1765 # --- public API
1766
1767 @property
1768 def running(self):
1769 return self._active
1770
1771 def start(self):
1772 assert not self.running
1773 self.__flag = threading.Event()
1774 threading.Thread.start(self)
1775 self.__flag.wait()
1776
1777 def stop(self):
1778 assert self.running
1779 self._active = False
1780 self.join()
1781
1782 def wait(self):
1783 # wait for handler connection to be closed, then stop the server
1784 while not getattr(self.handler_instance, "closed", False):
1785 time.sleep(0.001)
1786 self.stop()
1787
1788 # --- internals
1789
1790 def run(self):
1791 self._active = True
1792 self.__flag.set()
1793 while self._active and asyncore.socket_map:
1794 self._active_lock.acquire()
1795 asyncore.loop(timeout=0.001, count=1)
1796 self._active_lock.release()
1797 asyncore.close_all()
1798
1799 def handle_accept(self):
1800 conn, addr = self.accept()
1801 self.handler_instance = self.Handler(conn)
1802
1803 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001804 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001805 handle_read = handle_connect
1806
1807 def writable(self):
1808 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001809
1810 def handle_error(self):
1811 raise
1812
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001813
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001814@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001815@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1816class TestSendfile(unittest.TestCase):
1817
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001818 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001819 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001820 not sys.platform.startswith("solaris") and \
1821 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001822
1823 @classmethod
1824 def setUpClass(cls):
1825 with open(support.TESTFN, "wb") as f:
1826 f.write(cls.DATA)
1827
1828 @classmethod
1829 def tearDownClass(cls):
1830 support.unlink(support.TESTFN)
1831
1832 def setUp(self):
1833 self.server = SendfileTestServer((support.HOST, 0))
1834 self.server.start()
1835 self.client = socket.socket()
1836 self.client.connect((self.server.host, self.server.port))
1837 self.client.settimeout(1)
1838 # synchronize by waiting for "220 ready" response
1839 self.client.recv(1024)
1840 self.sockno = self.client.fileno()
1841 self.file = open(support.TESTFN, 'rb')
1842 self.fileno = self.file.fileno()
1843
1844 def tearDown(self):
1845 self.file.close()
1846 self.client.close()
1847 if self.server.running:
1848 self.server.stop()
1849
1850 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1851 """A higher level wrapper representing how an application is
1852 supposed to use sendfile().
1853 """
1854 while 1:
1855 try:
1856 if self.SUPPORT_HEADERS_TRAILERS:
1857 return os.sendfile(sock, file, offset, nbytes, headers,
1858 trailers)
1859 else:
1860 return os.sendfile(sock, file, offset, nbytes)
1861 except OSError as err:
1862 if err.errno == errno.ECONNRESET:
1863 # disconnected
1864 raise
1865 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1866 # we have to retry send data
1867 continue
1868 else:
1869 raise
1870
1871 def test_send_whole_file(self):
1872 # normal send
1873 total_sent = 0
1874 offset = 0
1875 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001876 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001877 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1878 if sent == 0:
1879 break
1880 offset += sent
1881 total_sent += sent
1882 self.assertTrue(sent <= nbytes)
1883 self.assertEqual(offset, total_sent)
1884
1885 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001886 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001887 self.client.close()
1888 self.server.wait()
1889 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001890 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001891 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001892
1893 def test_send_at_certain_offset(self):
1894 # start sending a file at a certain offset
1895 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001896 offset = len(self.DATA) // 2
1897 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001898 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001899 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001900 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1901 if sent == 0:
1902 break
1903 offset += sent
1904 total_sent += sent
1905 self.assertTrue(sent <= nbytes)
1906
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001907 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001908 self.client.close()
1909 self.server.wait()
1910 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001911 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001912 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001913 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001914 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001915
1916 def test_offset_overflow(self):
1917 # specify an offset > file size
1918 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001919 try:
1920 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1921 except OSError as e:
1922 # Solaris can raise EINVAL if offset >= file length, ignore.
1923 if e.errno != errno.EINVAL:
1924 raise
1925 else:
1926 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001927 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001928 self.client.close()
1929 self.server.wait()
1930 data = self.server.handler_instance.get_data()
1931 self.assertEqual(data, b'')
1932
1933 def test_invalid_offset(self):
1934 with self.assertRaises(OSError) as cm:
1935 os.sendfile(self.sockno, self.fileno, -1, 4096)
1936 self.assertEqual(cm.exception.errno, errno.EINVAL)
1937
1938 # --- headers / trailers tests
1939
1940 if SUPPORT_HEADERS_TRAILERS:
1941
1942 def test_headers(self):
1943 total_sent = 0
1944 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1945 headers=[b"x" * 512])
1946 total_sent += sent
1947 offset = 4096
1948 nbytes = 4096
1949 while 1:
1950 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1951 offset, nbytes)
1952 if sent == 0:
1953 break
1954 total_sent += sent
1955 offset += sent
1956
1957 expected_data = b"x" * 512 + self.DATA
1958 self.assertEqual(total_sent, len(expected_data))
1959 self.client.close()
1960 self.server.wait()
1961 data = self.server.handler_instance.get_data()
1962 self.assertEqual(hash(data), hash(expected_data))
1963
1964 def test_trailers(self):
1965 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001966 with open(TESTFN2, 'wb') as f:
1967 f.write(b"abcde")
1968 with open(TESTFN2, 'rb')as f:
1969 self.addCleanup(os.remove, TESTFN2)
1970 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1971 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001972 self.client.close()
1973 self.server.wait()
1974 data = self.server.handler_instance.get_data()
1975 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001976
1977 if hasattr(os, "SF_NODISKIO"):
1978 def test_flags(self):
1979 try:
1980 os.sendfile(self.sockno, self.fileno, 0, 4096,
1981 flags=os.SF_NODISKIO)
1982 except OSError as err:
1983 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1984 raise
1985
1986
Larry Hastings9cf065c2012-06-22 16:30:09 -07001987def supports_extended_attributes():
1988 if not hasattr(os, "setxattr"):
1989 return False
1990 try:
1991 with open(support.TESTFN, "wb") as fp:
1992 try:
1993 os.setxattr(fp.fileno(), b"user.test", b"")
1994 except OSError:
1995 return False
1996 finally:
1997 support.unlink(support.TESTFN)
1998 # Kernels < 2.6.39 don't respect setxattr flags.
1999 kernel_version = platform.release()
2000 m = re.match("2.6.(\d{1,2})", kernel_version)
2001 return m is None or int(m.group(1)) >= 39
2002
2003
2004@unittest.skipUnless(supports_extended_attributes(),
2005 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002006class ExtendedAttributeTests(unittest.TestCase):
2007
2008 def tearDown(self):
2009 support.unlink(support.TESTFN)
2010
Larry Hastings9cf065c2012-06-22 16:30:09 -07002011 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002012 fn = support.TESTFN
2013 open(fn, "wb").close()
2014 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002015 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002016 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002017 init_xattr = listxattr(fn)
2018 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002019 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002020 xattr = set(init_xattr)
2021 xattr.add("user.test")
2022 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002023 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2024 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2025 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002026 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002027 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002028 self.assertEqual(cm.exception.errno, errno.EEXIST)
2029 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002030 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002031 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002032 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002033 xattr.add("user.test2")
2034 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002035 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002036 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002037 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002038 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002039 xattr.remove("user.test")
2040 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002041 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2042 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2043 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2044 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002045 many = sorted("user.test{}".format(i) for i in range(100))
2046 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002047 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002048 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002049
Larry Hastings9cf065c2012-06-22 16:30:09 -07002050 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002051 def make_bytes(s):
2052 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002053 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002054 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002055 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002056
2057 def test_simple(self):
2058 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2059 os.listxattr)
2060
2061 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002062 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2063 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002064
2065 def test_fds(self):
2066 def getxattr(path, *args):
2067 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002068 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002069 def setxattr(path, *args):
2070 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002071 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002072 def removexattr(path, *args):
2073 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002074 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002075 def listxattr(path, *args):
2076 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002077 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002078 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2079
2080
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002081@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2082class Win32DeprecatedBytesAPI(unittest.TestCase):
2083 def test_deprecated(self):
2084 import nt
2085 filename = os.fsencode(support.TESTFN)
2086 with warnings.catch_warnings():
2087 warnings.simplefilter("error", DeprecationWarning)
2088 for func, *args in (
2089 (nt._getfullpathname, filename),
2090 (nt._isdir, filename),
2091 (os.access, filename, os.R_OK),
2092 (os.chdir, filename),
2093 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002094 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002095 (os.link, filename, filename),
2096 (os.listdir, filename),
2097 (os.lstat, filename),
2098 (os.mkdir, filename),
2099 (os.open, filename, os.O_RDONLY),
2100 (os.rename, filename, filename),
2101 (os.rmdir, filename),
2102 (os.startfile, filename),
2103 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002104 (os.unlink, filename),
2105 (os.utime, filename),
2106 ):
2107 self.assertRaises(DeprecationWarning, func, *args)
2108
Victor Stinner28216442011-11-16 00:34:44 +01002109 @support.skip_unless_symlink
2110 def test_symlink(self):
2111 filename = os.fsencode(support.TESTFN)
2112 with warnings.catch_warnings():
2113 warnings.simplefilter("error", DeprecationWarning)
2114 self.assertRaises(DeprecationWarning,
2115 os.symlink, filename, filename)
2116
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002117
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002118@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2119class TermsizeTests(unittest.TestCase):
2120 def test_does_not_crash(self):
2121 """Check if get_terminal_size() returns a meaningful value.
2122
2123 There's no easy portable way to actually check the size of the
2124 terminal, so let's check if it returns something sensible instead.
2125 """
2126 try:
2127 size = os.get_terminal_size()
2128 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002129 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002130 # Under win32 a generic OSError can be thrown if the
2131 # handle cannot be retrieved
2132 self.skipTest("failed to query terminal size")
2133 raise
2134
Antoine Pitroucfade362012-02-08 23:48:59 +01002135 self.assertGreaterEqual(size.columns, 0)
2136 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002137
2138 def test_stty_match(self):
2139 """Check if stty returns the same results
2140
2141 stty actually tests stdin, so get_terminal_size is invoked on
2142 stdin explicitly. If stty succeeded, then get_terminal_size()
2143 should work too.
2144 """
2145 try:
2146 size = subprocess.check_output(['stty', 'size']).decode().split()
2147 except (FileNotFoundError, subprocess.CalledProcessError):
2148 self.skipTest("stty invocation failed")
2149 expected = (int(size[1]), int(size[0])) # reversed order
2150
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002151 try:
2152 actual = os.get_terminal_size(sys.__stdin__.fileno())
2153 except OSError as e:
2154 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2155 # Under win32 a generic OSError can be thrown if the
2156 # handle cannot be retrieved
2157 self.skipTest("failed to query terminal size")
2158 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002159 self.assertEqual(expected, actual)
2160
2161
Victor Stinner292c8352012-10-30 02:17:38 +01002162class OSErrorTests(unittest.TestCase):
2163 def setUp(self):
2164 class Str(str):
2165 pass
2166
Victor Stinnerafe17062012-10-31 22:47:43 +01002167 self.bytes_filenames = []
2168 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002169 if support.TESTFN_UNENCODABLE is not None:
2170 decoded = support.TESTFN_UNENCODABLE
2171 else:
2172 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002173 self.unicode_filenames.append(decoded)
2174 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002175 if support.TESTFN_UNDECODABLE is not None:
2176 encoded = support.TESTFN_UNDECODABLE
2177 else:
2178 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002179 self.bytes_filenames.append(encoded)
2180 self.bytes_filenames.append(memoryview(encoded))
2181
2182 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002183
2184 def test_oserror_filename(self):
2185 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002186 (self.filenames, os.chdir,),
2187 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002188 (self.filenames, os.lstat,),
2189 (self.filenames, os.open, os.O_RDONLY),
2190 (self.filenames, os.rmdir,),
2191 (self.filenames, os.stat,),
2192 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002193 ]
2194 if sys.platform == "win32":
2195 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002196 (self.bytes_filenames, os.rename, b"dst"),
2197 (self.bytes_filenames, os.replace, b"dst"),
2198 (self.unicode_filenames, os.rename, "dst"),
2199 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002200 # Issue #16414: Don't test undecodable names with listdir()
2201 # because of a Windows bug.
2202 #
2203 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2204 # empty list (instead of failing), whereas os.listdir(b'\xff')
2205 # raises a FileNotFoundError. It looks like a Windows bug:
2206 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2207 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2208 # ERROR_PATH_NOT_FOUND (3).
2209 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002210 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002211 else:
2212 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002213 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002214 (self.filenames, os.rename, "dst"),
2215 (self.filenames, os.replace, "dst"),
2216 ))
2217 if hasattr(os, "chown"):
2218 funcs.append((self.filenames, os.chown, 0, 0))
2219 if hasattr(os, "lchown"):
2220 funcs.append((self.filenames, os.lchown, 0, 0))
2221 if hasattr(os, "truncate"):
2222 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002223 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002224 funcs.append((self.filenames, os.chflags, 0))
2225 if hasattr(os, "lchflags"):
2226 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002227 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002228 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002229 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002230 if sys.platform == "win32":
2231 funcs.append((self.bytes_filenames, os.link, b"dst"))
2232 funcs.append((self.unicode_filenames, os.link, "dst"))
2233 else:
2234 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002235 if hasattr(os, "listxattr"):
2236 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002237 (self.filenames, os.listxattr,),
2238 (self.filenames, os.getxattr, "user.test"),
2239 (self.filenames, os.setxattr, "user.test", b'user'),
2240 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002241 ))
2242 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002243 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002244 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002245 if sys.platform == "win32":
2246 funcs.append((self.unicode_filenames, os.readlink,))
2247 else:
2248 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002249
Victor Stinnerafe17062012-10-31 22:47:43 +01002250 for filenames, func, *func_args in funcs:
2251 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002252 try:
2253 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002254 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002255 self.assertIs(err.filename, name)
2256 else:
2257 self.fail("No exception thrown by {}".format(func))
2258
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002259class CPUCountTests(unittest.TestCase):
2260 def test_cpu_count(self):
2261 cpus = os.cpu_count()
2262 if cpus is not None:
2263 self.assertIsInstance(cpus, int)
2264 self.assertGreater(cpus, 0)
2265 else:
2266 self.skipTest("Could not determine the number of CPUs")
2267
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002268@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002269def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002270 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002271 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002272 StatAttributeTests,
2273 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002274 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002275 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002276 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002277 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002278 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002279 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002280 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002281 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002282 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002283 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002284 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002285 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002286 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002287 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002288 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002289 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002290 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002291 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002292 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002293 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002294 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002295 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002296 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002297 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002298 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002299 CPUCountTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002300 )
Fred Drake2e2be372001-09-20 21:33:42 +00002301
2302if __name__ == "__main__":
2303 test_main()