blob: d0dd364f89a039213aa4ef7d406e881135ab9983 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070027import decimal
28import fractions
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000029try:
30 import threading
31except ImportError:
32 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020033try:
34 import resource
35except ImportError:
36 resource = None
37
Georg Brandl2daf6ae2012-02-20 19:54:16 +010038from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000039
Victor Stinner034d0aa2012-06-05 01:22:15 +020040with warnings.catch_warnings():
41 warnings.simplefilter("ignore", DeprecationWarning)
42 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010043st = os.stat(__file__)
44stat_supports_subsecond = (
45 # check if float and int timestamps are different
46 (st.st_atime != st[7])
47 or (st.st_mtime != st[8])
48 or (st.st_ctime != st[9]))
49
Mark Dickinson7cf03892010-04-16 13:45:35 +000050# Detect whether we're on a Linux system that uses the (now outdated
51# and unmaintained) linuxthreads threading library. There's an issue
52# when combining linuxthreads with a failed execv call: see
53# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020054if hasattr(sys, 'thread_info') and sys.thread_info.version:
55 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
56else:
57 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000058
Stefan Krahebee49a2013-01-17 15:31:00 +010059# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
60HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
61
Thomas Wouters0e3f5912006-08-11 14:57:12 +000062# Tests creating TESTFN
63class FileTests(unittest.TestCase):
64 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000065 if os.path.exists(support.TESTFN):
66 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067 tearDown = setUp
68
69 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000070 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000071 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000072 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000073
Christian Heimesfdab48e2008-01-20 09:06:41 +000074 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000075 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
76 # We must allocate two consecutive file descriptors, otherwise
77 # it will mess up other file descriptors (perhaps even the three
78 # standard ones).
79 second = os.dup(first)
80 try:
81 retries = 0
82 while second != first + 1:
83 os.close(first)
84 retries += 1
85 if retries > 10:
86 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000087 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000088 first, second = second, os.dup(second)
89 finally:
90 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000091 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000092 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000093 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000095 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000096 def test_rename(self):
97 path = support.TESTFN
98 old = sys.getrefcount(path)
99 self.assertRaises(TypeError, os.rename, path, 0)
100 new = sys.getrefcount(path)
101 self.assertEqual(old, new)
102
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000103 def test_read(self):
104 with open(support.TESTFN, "w+b") as fobj:
105 fobj.write(b"spam")
106 fobj.flush()
107 fd = fobj.fileno()
108 os.lseek(fd, 0, 0)
109 s = os.read(fd, 4)
110 self.assertEqual(type(s), bytes)
111 self.assertEqual(s, b"spam")
112
113 def test_write(self):
114 # os.write() accepts bytes- and buffer-like objects but not strings
115 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
116 self.assertRaises(TypeError, os.write, fd, "beans")
117 os.write(fd, b"bacon\n")
118 os.write(fd, bytearray(b"eggs\n"))
119 os.write(fd, memoryview(b"spam\n"))
120 os.close(fd)
121 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000122 self.assertEqual(fobj.read().splitlines(),
123 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000124
Victor Stinnere0daff12011-03-20 23:36:35 +0100125 def write_windows_console(self, *args):
126 retcode = subprocess.call(args,
127 # use a new console to not flood the test output
128 creationflags=subprocess.CREATE_NEW_CONSOLE,
129 # use a shell to hide the console window (SW_HIDE)
130 shell=True)
131 self.assertEqual(retcode, 0)
132
133 @unittest.skipUnless(sys.platform == 'win32',
134 'test specific to the Windows console')
135 def test_write_windows_console(self):
136 # Issue #11395: the Windows console returns an error (12: not enough
137 # space error) on writing into stdout if stdout mode is binary and the
138 # length is greater than 66,000 bytes (or less, depending on heap
139 # usage).
140 code = "print('x' * 100000)"
141 self.write_windows_console(sys.executable, "-c", code)
142 self.write_windows_console(sys.executable, "-u", "-c", code)
143
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000144 def fdopen_helper(self, *args):
145 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200146 f = os.fdopen(fd, *args)
147 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000148
149 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200150 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
151 os.close(fd)
152
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000153 self.fdopen_helper()
154 self.fdopen_helper('r')
155 self.fdopen_helper('r', 100)
156
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100157 def test_replace(self):
158 TESTFN2 = support.TESTFN + ".2"
159 with open(support.TESTFN, 'w') as f:
160 f.write("1")
161 with open(TESTFN2, 'w') as f:
162 f.write("2")
163 self.addCleanup(os.unlink, TESTFN2)
164 os.replace(support.TESTFN, TESTFN2)
165 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
166 with open(TESTFN2, 'r') as f:
167 self.assertEqual(f.read(), "1")
168
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200169
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000170# Test attributes on return values from os.*stat* family.
171class StatAttributeTests(unittest.TestCase):
172 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000173 os.mkdir(support.TESTFN)
174 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000175 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000176 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000177 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000178
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000179 def tearDown(self):
180 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000181 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000182
Antoine Pitrou38425292010-09-21 18:19:07 +0000183 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000184 if not hasattr(os, "stat"):
185 return
186
Antoine Pitrou38425292010-09-21 18:19:07 +0000187 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000188
189 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000190 self.assertEqual(result[stat.ST_SIZE], 3)
191 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000192
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000193 # Make sure all the attributes are there
194 members = dir(result)
195 for name in dir(stat):
196 if name[:3] == 'ST_':
197 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000198 if name.endswith("TIME"):
199 def trunc(x): return int(x)
200 else:
201 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000202 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000204 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000205
Larry Hastings6fe20b32012-04-19 15:07:49 -0700206 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700207 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700208 for name in 'st_atime st_mtime st_ctime'.split():
209 floaty = int(getattr(result, name) * 100000)
210 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700211 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700212
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000213 try:
214 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200215 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000216 except IndexError:
217 pass
218
219 # Make sure that assignment fails
220 try:
221 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200222 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000223 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000224 pass
225
226 try:
227 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200228 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000229 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000230 pass
231
232 try:
233 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200234 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235 except AttributeError:
236 pass
237
238 # Use the stat_result constructor with a too-short tuple.
239 try:
240 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200241 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 except TypeError:
243 pass
244
Ezio Melotti42da6632011-03-15 05:18:48 +0200245 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246 try:
247 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
248 except TypeError:
249 pass
250
Antoine Pitrou38425292010-09-21 18:19:07 +0000251 def test_stat_attributes(self):
252 self.check_stat_attributes(self.fname)
253
254 def test_stat_attributes_bytes(self):
255 try:
256 fname = self.fname.encode(sys.getfilesystemencoding())
257 except UnicodeEncodeError:
258 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100259 with warnings.catch_warnings():
260 warnings.simplefilter("ignore", DeprecationWarning)
261 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000262
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263 def test_statvfs_attributes(self):
264 if not hasattr(os, "statvfs"):
265 return
266
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000267 try:
268 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000269 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000270 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000271 if e.errno == errno.ENOSYS:
272 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273
274 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000275 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000277 # Make sure all the attributes are there.
278 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
279 'ffree', 'favail', 'flag', 'namemax')
280 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000281 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282
283 # Make sure that assignment really fails
284 try:
285 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200286 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000287 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000288 pass
289
290 try:
291 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200292 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000293 except AttributeError:
294 pass
295
296 # Use the constructor with a too-short tuple.
297 try:
298 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200299 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000300 except TypeError:
301 pass
302
Ezio Melotti42da6632011-03-15 05:18:48 +0200303 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000304 try:
305 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
306 except TypeError:
307 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000308
Thomas Wouters89f507f2006-12-13 04:49:30 +0000309 def test_utime_dir(self):
310 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000311 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000312 # round to int, because some systems may support sub-second
313 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000314 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
315 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000316 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000317
Larry Hastings76ad59b2012-05-03 00:30:07 -0700318 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600319 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600320 # second argument. Check that the previous methods of passing
321 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700322 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600323 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700324 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
325 # Setting the time to the time you just read, then reading again,
326 # should always return exactly the same times.
327 st1 = os.stat(filename)
328 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
329 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600330 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700331 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600332 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700333 # Set to the current time in the new way
334 os.utime(filename)
335 st3 = os.stat(filename)
336 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
337
338 def test_utime(self):
339 def utime(file, times):
340 return os.utime(file, times)
341 self._test_utime(self.fname, getattr, utime, 10)
342 self._test_utime(support.TESTFN, getattr, utime, 10)
343
344
345 def _test_utime_ns(self, set_times_ns, test_dir=True):
346 def getattr_ns(o, attr):
347 return getattr(o, attr + "_ns")
348 ten_s = 10 * 1000 * 1000 * 1000
349 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
350 if test_dir:
351 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
352
353 def test_utime_ns(self):
354 def utime_ns(file, times):
355 return os.utime(file, ns=times)
356 self._test_utime_ns(utime_ns)
357
Larry Hastings9cf065c2012-06-22 16:30:09 -0700358 requires_utime_dir_fd = unittest.skipUnless(
359 os.utime in os.supports_dir_fd,
360 "dir_fd support for utime required for this test.")
361 requires_utime_fd = unittest.skipUnless(
362 os.utime in os.supports_fd,
363 "fd support for utime required for this test.")
364 requires_utime_nofollow_symlinks = unittest.skipUnless(
365 os.utime in os.supports_follow_symlinks,
366 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700367
Larry Hastings9cf065c2012-06-22 16:30:09 -0700368 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700369 def test_lutimes_ns(self):
370 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700371 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700372 self._test_utime_ns(lutimes_ns)
373
Larry Hastings9cf065c2012-06-22 16:30:09 -0700374 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700375 def test_futimes_ns(self):
376 def futimes_ns(file, times):
377 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700378 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700379 self._test_utime_ns(futimes_ns, test_dir=False)
380
381 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700382 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700383 getattr(os, name)(arg, (5, 5), ns=(5, 5))
384
385 def test_utime_invalid_arguments(self):
386 self._utime_invalid_arguments('utime', self.fname)
387
Brian Curtin52fbea12011-11-06 13:41:17 -0600388
Victor Stinner1aa54a42012-02-08 04:09:37 +0100389 @unittest.skipUnless(stat_supports_subsecond,
390 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100391 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100392 asec, amsec = 1, 901
393 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100394 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100395 mtime = msec + mmsec * 1e-3
396 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100397 os.utime(filename, (0, 0))
398 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200399 with warnings.catch_warnings():
400 warnings.simplefilter("ignore", DeprecationWarning)
401 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100402 st = os.stat(filename)
403 self.assertAlmostEqual(st.st_atime, atime, places=3)
404 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100405
Victor Stinnera2f7c002012-02-08 03:36:25 +0100406 def test_utime_subsecond(self):
407 def set_time(filename, atime, mtime):
408 os.utime(filename, (atime, mtime))
409 self._test_utime_subsecond(set_time)
410
Larry Hastings9cf065c2012-06-22 16:30:09 -0700411 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100412 def test_futimes_subsecond(self):
413 def set_time(filename, atime, mtime):
414 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700415 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100416 self._test_utime_subsecond(set_time)
417
Larry Hastings9cf065c2012-06-22 16:30:09 -0700418 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100419 def test_futimens_subsecond(self):
420 def set_time(filename, atime, mtime):
421 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700422 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100423 self._test_utime_subsecond(set_time)
424
Larry Hastings9cf065c2012-06-22 16:30:09 -0700425 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100426 def test_futimesat_subsecond(self):
427 def set_time(filename, atime, mtime):
428 dirname = os.path.dirname(filename)
429 dirfd = os.open(dirname, os.O_RDONLY)
430 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700431 os.utime(os.path.basename(filename), dir_fd=dirfd,
432 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100433 finally:
434 os.close(dirfd)
435 self._test_utime_subsecond(set_time)
436
Larry Hastings9cf065c2012-06-22 16:30:09 -0700437 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100438 def test_lutimes_subsecond(self):
439 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700440 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100441 self._test_utime_subsecond(set_time)
442
Larry Hastings9cf065c2012-06-22 16:30:09 -0700443 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100444 def test_utimensat_subsecond(self):
445 def set_time(filename, atime, mtime):
446 dirname = os.path.dirname(filename)
447 dirfd = os.open(dirname, os.O_RDONLY)
448 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700449 os.utime(os.path.basename(filename), dir_fd=dirfd,
450 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100451 finally:
452 os.close(dirfd)
453 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100454
Thomas Wouters89f507f2006-12-13 04:49:30 +0000455 # Restrict test to Win32, since there is no guarantee other
456 # systems support centiseconds
457 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000458 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000459 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000460 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000461 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000462 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000463 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000464 return buf.value
465
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000466 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000467 def test_1565150(self):
468 t1 = 1159195039.25
469 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000470 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000471
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000472 def test_large_time(self):
473 t1 = 5000000000 # some day in 2128
474 os.utime(self.fname, (t1, t1))
475 self.assertEqual(os.stat(self.fname).st_mtime, t1)
476
Guido van Rossumd8faa362007-04-27 19:54:29 +0000477 def test_1686475(self):
478 # Verify that an open file can be stat'ed
479 try:
480 os.stat(r"c:\pagefile.sys")
Andrew Svetlov2606a6f2012-12-19 14:33:35 +0200481 except FileNotFoundError:
482 pass # file does not exist; cannot run test
483 except OSError as e:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000484 self.fail("Could not stat pagefile.sys")
485
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100486 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
487 def test_15261(self):
488 # Verify that stat'ing a closed fd does not cause crash
489 r, w = os.pipe()
490 try:
491 os.stat(r) # should not raise error
492 finally:
493 os.close(r)
494 os.close(w)
495 with self.assertRaises(OSError) as ctx:
496 os.stat(r)
497 self.assertEqual(ctx.exception.errno, errno.EBADF)
498
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000499from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000500
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000501class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000502 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000503 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000504
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000505 def setUp(self):
506 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000507 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000508 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000509 for key, value in self._reference().items():
510 os.environ[key] = value
511
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000512 def tearDown(self):
513 os.environ.clear()
514 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000515 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000516 os.environb.clear()
517 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000518
Christian Heimes90333392007-11-01 19:08:42 +0000519 def _reference(self):
520 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
521
522 def _empty_mapping(self):
523 os.environ.clear()
524 return os.environ
525
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000526 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300527 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000528 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000529 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300530 os.environ.update(HELLO="World")
531 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
532 value = popen.read().strip()
533 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000534
Ezio Melottic7e139b2012-09-26 20:01:34 +0300535 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000536 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300537 with os.popen(
538 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
539 it = iter(popen)
540 self.assertEqual(next(it), "line1\n")
541 self.assertEqual(next(it), "line2\n")
542 self.assertEqual(next(it), "line3\n")
543 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000544
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000545 # Verify environ keys and values from the OS are of the
546 # correct str type.
547 def test_keyvalue_types(self):
548 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000549 self.assertEqual(type(key), str)
550 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000551
Christian Heimes90333392007-11-01 19:08:42 +0000552 def test_items(self):
553 for key, value in self._reference().items():
554 self.assertEqual(os.environ.get(key), value)
555
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000556 # Issue 7310
557 def test___repr__(self):
558 """Check that the repr() of os.environ looks like environ({...})."""
559 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000560 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
561 '{!r}: {!r}'.format(key, value)
562 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000563
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000564 def test_get_exec_path(self):
565 defpath_list = os.defpath.split(os.pathsep)
566 test_path = ['/monty', '/python', '', '/flying/circus']
567 test_env = {'PATH': os.pathsep.join(test_path)}
568
569 saved_environ = os.environ
570 try:
571 os.environ = dict(test_env)
572 # Test that defaulting to os.environ works.
573 self.assertSequenceEqual(test_path, os.get_exec_path())
574 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
575 finally:
576 os.environ = saved_environ
577
578 # No PATH environment variable
579 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
580 # Empty PATH environment variable
581 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
582 # Supplied PATH environment variable
583 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
584
Victor Stinnerb745a742010-05-18 17:17:23 +0000585 if os.supports_bytes_environ:
586 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000587 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000588 # ignore BytesWarning warning
589 with warnings.catch_warnings(record=True):
590 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000591 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000592 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000593 pass
594 else:
595 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000596
597 # bytes key and/or value
598 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
599 ['abc'])
600 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
601 ['abc'])
602 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
603 ['abc'])
604
605 @unittest.skipUnless(os.supports_bytes_environ,
606 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000607 def test_environb(self):
608 # os.environ -> os.environb
609 value = 'euro\u20ac'
610 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000611 value_bytes = value.encode(sys.getfilesystemencoding(),
612 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000613 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000614 msg = "U+20AC character is not encodable to %s" % (
615 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000616 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000617 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000618 self.assertEqual(os.environ['unicode'], value)
619 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000620
621 # os.environb -> os.environ
622 value = b'\xff'
623 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000624 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000625 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000626 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000627
Charles-François Natali2966f102011-11-26 11:32:46 +0100628 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
629 # #13415).
630 @support.requires_freebsd_version(7)
631 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100632 def test_unset_error(self):
633 if sys.platform == "win32":
634 # an environment variable is limited to 32,767 characters
635 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100636 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100637 else:
638 # "=" is not allowed in a variable name
639 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100640 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100641
Victor Stinner6d101392013-04-14 16:35:04 +0200642 def test_key_type(self):
643 missing = 'missingkey'
644 self.assertNotIn(missing, os.environ)
645
Victor Stinner839e5ea2013-04-14 16:43:03 +0200646 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200647 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200648 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200649 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200650
Victor Stinner839e5ea2013-04-14 16:43:03 +0200651 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200652 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200653 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200654 self.assertTrue(cm.exception.__suppress_context__)
655
Victor Stinner6d101392013-04-14 16:35:04 +0200656
Tim Petersc4e09402003-04-25 07:11:48 +0000657class WalkTests(unittest.TestCase):
658 """Tests for os.walk()."""
659
Charles-François Natali7372b062012-02-05 15:15:38 +0100660 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000661 import os
662 from os.path import join
663
664 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000665 # TESTFN/
666 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000667 # tmp1
668 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000669 # tmp2
670 # SUB11/ no kids
671 # SUB2/ a file kid and a dirsymlink kid
672 # tmp3
673 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200674 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000675 # TEST2/
676 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000677 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000678 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000679 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000680 sub2_path = join(walk_path, "SUB2")
681 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000682 tmp2_path = join(sub1_path, "tmp2")
683 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000684 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000685 t2_path = join(support.TESTFN, "TEST2")
686 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200687 link_path = join(sub2_path, "link")
688 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000689
690 # Create stuff.
691 os.makedirs(sub11_path)
692 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000693 os.makedirs(t2_path)
694 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000695 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000696 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
697 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000698 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400699 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400700 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200701 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000702 else:
703 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000704
705 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000706 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000707 self.assertEqual(len(all), 4)
708 # We can't know which order SUB1 and SUB2 will appear in.
709 # Not flipped: TESTFN, SUB1, SUB11, SUB2
710 # flipped: TESTFN, SUB2, SUB1, SUB11
711 flipped = all[0][1][0] != "SUB1"
712 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200713 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000714 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000715 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
716 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000717 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000718
719 # Prune the search.
720 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000721 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000722 all.append((root, dirs, files))
723 # Don't descend into SUB1.
724 if 'SUB1' in dirs:
725 # Note that this also mutates the dirs we appended to all!
726 dirs.remove('SUB1')
727 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000728 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200729 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000730 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000731
732 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000733 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000734 self.assertEqual(len(all), 4)
735 # We can't know which order SUB1 and SUB2 will appear in.
736 # Not flipped: SUB11, SUB1, SUB2, TESTFN
737 # flipped: SUB2, SUB11, SUB1, TESTFN
738 flipped = all[3][1][0] != "SUB1"
739 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200740 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000741 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000742 self.assertEqual(all[flipped], (sub11_path, [], []))
743 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000744 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000745
Brian Curtin3b4499c2010-12-28 14:31:47 +0000746 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000747 # Walk, following symlinks.
748 for root, dirs, files in os.walk(walk_path, followlinks=True):
749 if root == link_path:
750 self.assertEqual(dirs, [])
751 self.assertEqual(files, ["tmp4"])
752 break
753 else:
754 self.fail("Didn't follow symlink with followlinks=True")
755
756 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000757 # Tear everything down. This is a decent use for bottom-up on
758 # Windows, which doesn't have a recursive delete command. The
759 # (not so) subtlety is that rmdir will fail unless the dir's
760 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000761 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000762 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000763 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000764 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000765 dirname = os.path.join(root, name)
766 if not os.path.islink(dirname):
767 os.rmdir(dirname)
768 else:
769 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000770 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000771
Charles-François Natali7372b062012-02-05 15:15:38 +0100772
773@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
774class FwalkTests(WalkTests):
775 """Tests for os.fwalk()."""
776
Larry Hastingsc48fe982012-06-25 04:49:05 -0700777 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
778 """
779 compare with walk() results.
780 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700781 walk_kwargs = walk_kwargs.copy()
782 fwalk_kwargs = fwalk_kwargs.copy()
783 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
784 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
785 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700786
Charles-François Natali7372b062012-02-05 15:15:38 +0100787 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700788 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100789 expected[root] = (set(dirs), set(files))
790
Larry Hastingsc48fe982012-06-25 04:49:05 -0700791 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100792 self.assertIn(root, expected)
793 self.assertEqual(expected[root], (set(dirs), set(files)))
794
Larry Hastingsc48fe982012-06-25 04:49:05 -0700795 def test_compare_to_walk(self):
796 kwargs = {'top': support.TESTFN}
797 self._compare_to_walk(kwargs, kwargs)
798
Charles-François Natali7372b062012-02-05 15:15:38 +0100799 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700800 try:
801 fd = os.open(".", os.O_RDONLY)
802 walk_kwargs = {'top': support.TESTFN}
803 fwalk_kwargs = walk_kwargs.copy()
804 fwalk_kwargs['dir_fd'] = fd
805 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
806 finally:
807 os.close(fd)
808
809 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100810 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700811 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
812 args = support.TESTFN, topdown, None
813 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100814 # check that the FD is valid
815 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700816 # redundant check
817 os.stat(rootfd)
818 # check that listdir() returns consistent information
819 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100820
821 def test_fd_leak(self):
822 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
823 # we both check that calling fwalk() a large number of times doesn't
824 # yield EMFILE, and that the minimum allocated FD hasn't changed.
825 minfd = os.dup(1)
826 os.close(minfd)
827 for i in range(256):
828 for x in os.fwalk(support.TESTFN):
829 pass
830 newfd = os.dup(1)
831 self.addCleanup(os.close, newfd)
832 self.assertEqual(newfd, minfd)
833
834 def tearDown(self):
835 # cleanup
836 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
837 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700838 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100839 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700840 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700841 if stat.S_ISDIR(st.st_mode):
842 os.rmdir(name, dir_fd=rootfd)
843 else:
844 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100845 os.rmdir(support.TESTFN)
846
847
Guido van Rossume7ba4952007-06-06 23:52:48 +0000848class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000849 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000850 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000851
852 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000853 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000854 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
855 os.makedirs(path) # Should work
856 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
857 os.makedirs(path)
858
859 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000860 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000861 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
862 os.makedirs(path)
863 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
864 'dir5', 'dir6')
865 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000866
Terry Reedy5a22b652010-12-02 07:05:56 +0000867 def test_exist_ok_existing_directory(self):
868 path = os.path.join(support.TESTFN, 'dir1')
869 mode = 0o777
870 old_mask = os.umask(0o022)
871 os.makedirs(path, mode)
872 self.assertRaises(OSError, os.makedirs, path, mode)
873 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
874 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
875 os.makedirs(path, mode=mode, exist_ok=True)
876 os.umask(old_mask)
877
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400878 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700879 def test_chown_uid_gid_arguments_must_be_index(self):
880 stat = os.stat(support.TESTFN)
881 uid = stat.st_uid
882 gid = stat.st_gid
883 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
884 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
885 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
886 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
887 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
888
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700889 def test_exist_ok_s_isgid_directory(self):
890 path = os.path.join(support.TESTFN, 'dir1')
891 S_ISGID = stat.S_ISGID
892 mode = 0o777
893 old_mask = os.umask(0o022)
894 try:
895 existing_testfn_mode = stat.S_IMODE(
896 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700897 try:
898 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700899 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700900 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700901 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
902 raise unittest.SkipTest('No support for S_ISGID dir mode.')
903 # The os should apply S_ISGID from the parent dir for us, but
904 # this test need not depend on that behavior. Be explicit.
905 os.makedirs(path, mode | S_ISGID)
906 # http://bugs.python.org/issue14992
907 # Should not fail when the bit is already set.
908 os.makedirs(path, mode, exist_ok=True)
909 # remove the bit.
910 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
911 with self.assertRaises(OSError):
912 # Should fail when the bit is not already set when demanded.
913 os.makedirs(path, mode | S_ISGID, exist_ok=True)
914 finally:
915 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000916
917 def test_exist_ok_existing_regular_file(self):
918 base = support.TESTFN
919 path = os.path.join(support.TESTFN, 'dir1')
920 f = open(path, 'w')
921 f.write('abc')
922 f.close()
923 self.assertRaises(OSError, os.makedirs, path)
924 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
925 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
926 os.remove(path)
927
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000928 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000929 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000930 'dir4', 'dir5', 'dir6')
931 # If the tests failed, the bottom-most directory ('../dir6')
932 # may not have been created, so we look for the outermost directory
933 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000934 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000935 path = os.path.dirname(path)
936
937 os.removedirs(path)
938
Andrew Svetlov405faed2012-12-25 12:18:09 +0200939
940class RemoveDirsTests(unittest.TestCase):
941 def setUp(self):
942 os.makedirs(support.TESTFN)
943
944 def tearDown(self):
945 support.rmtree(support.TESTFN)
946
947 def test_remove_all(self):
948 dira = os.path.join(support.TESTFN, 'dira')
949 os.mkdir(dira)
950 dirb = os.path.join(dira, 'dirb')
951 os.mkdir(dirb)
952 os.removedirs(dirb)
953 self.assertFalse(os.path.exists(dirb))
954 self.assertFalse(os.path.exists(dira))
955 self.assertFalse(os.path.exists(support.TESTFN))
956
957 def test_remove_partial(self):
958 dira = os.path.join(support.TESTFN, 'dira')
959 os.mkdir(dira)
960 dirb = os.path.join(dira, 'dirb')
961 os.mkdir(dirb)
962 with open(os.path.join(dira, 'file.txt'), 'w') as f:
963 f.write('text')
964 os.removedirs(dirb)
965 self.assertFalse(os.path.exists(dirb))
966 self.assertTrue(os.path.exists(dira))
967 self.assertTrue(os.path.exists(support.TESTFN))
968
969 def test_remove_nothing(self):
970 dira = os.path.join(support.TESTFN, 'dira')
971 os.mkdir(dira)
972 dirb = os.path.join(dira, 'dirb')
973 os.mkdir(dirb)
974 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
975 f.write('text')
976 with self.assertRaises(OSError):
977 os.removedirs(dirb)
978 self.assertTrue(os.path.exists(dirb))
979 self.assertTrue(os.path.exists(dira))
980 self.assertTrue(os.path.exists(support.TESTFN))
981
982
Guido van Rossume7ba4952007-06-06 23:52:48 +0000983class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000984 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200985 with open(os.devnull, 'wb') as f:
986 f.write(b'hello')
987 f.close()
988 with open(os.devnull, 'rb') as f:
989 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000990
Andrew Svetlov405faed2012-12-25 12:18:09 +0200991
Guido van Rossume7ba4952007-06-06 23:52:48 +0000992class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100993 def test_urandom_length(self):
994 self.assertEqual(len(os.urandom(0)), 0)
995 self.assertEqual(len(os.urandom(1)), 1)
996 self.assertEqual(len(os.urandom(10)), 10)
997 self.assertEqual(len(os.urandom(100)), 100)
998 self.assertEqual(len(os.urandom(1000)), 1000)
999
1000 def test_urandom_value(self):
1001 data1 = os.urandom(16)
1002 data2 = os.urandom(16)
1003 self.assertNotEqual(data1, data2)
1004
1005 def get_urandom_subprocess(self, count):
1006 code = '\n'.join((
1007 'import os, sys',
1008 'data = os.urandom(%s)' % count,
1009 'sys.stdout.buffer.write(data)',
1010 'sys.stdout.buffer.flush()'))
1011 out = assert_python_ok('-c', code)
1012 stdout = out[1]
1013 self.assertEqual(len(stdout), 16)
1014 return stdout
1015
1016 def test_urandom_subprocess(self):
1017 data1 = self.get_urandom_subprocess(16)
1018 data2 = self.get_urandom_subprocess(16)
1019 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001020
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001021 @unittest.skipUnless(resource, "test requires the resource module")
1022 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001023 # Check urandom() failing when it is not able to open /dev/random.
1024 # We spawn a new process to make the test more robust (if getrlimit()
1025 # failed to restore the file descriptor limit after this, the whole
1026 # test suite would crash; this actually happened on the OS X Tiger
1027 # buildbot).
1028 code = """if 1:
1029 import errno
1030 import os
1031 import resource
1032
1033 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1034 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1035 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001036 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001037 except OSError as e:
1038 assert e.errno == errno.EMFILE, e.errno
1039 else:
1040 raise AssertionError("OSError not raised")
1041 """
1042 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001043
1044
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001045@contextlib.contextmanager
1046def _execvpe_mockup(defpath=None):
1047 """
1048 Stubs out execv and execve functions when used as context manager.
1049 Records exec calls. The mock execv and execve functions always raise an
1050 exception as they would normally never return.
1051 """
1052 # A list of tuples containing (function name, first arg, args)
1053 # of calls to execv or execve that have been made.
1054 calls = []
1055
1056 def mock_execv(name, *args):
1057 calls.append(('execv', name, args))
1058 raise RuntimeError("execv called")
1059
1060 def mock_execve(name, *args):
1061 calls.append(('execve', name, args))
1062 raise OSError(errno.ENOTDIR, "execve called")
1063
1064 try:
1065 orig_execv = os.execv
1066 orig_execve = os.execve
1067 orig_defpath = os.defpath
1068 os.execv = mock_execv
1069 os.execve = mock_execve
1070 if defpath is not None:
1071 os.defpath = defpath
1072 yield calls
1073 finally:
1074 os.execv = orig_execv
1075 os.execve = orig_execve
1076 os.defpath = orig_defpath
1077
Guido van Rossume7ba4952007-06-06 23:52:48 +00001078class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001079 @unittest.skipIf(USING_LINUXTHREADS,
1080 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001081 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001082 self.assertRaises(OSError, os.execvpe, 'no such app-',
1083 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001084
Thomas Heller6790d602007-08-30 17:15:14 +00001085 def test_execvpe_with_bad_arglist(self):
1086 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1087
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001088 @unittest.skipUnless(hasattr(os, '_execvpe'),
1089 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001090 def _test_internal_execvpe(self, test_type):
1091 program_path = os.sep + 'absolutepath'
1092 if test_type is bytes:
1093 program = b'executable'
1094 fullpath = os.path.join(os.fsencode(program_path), program)
1095 native_fullpath = fullpath
1096 arguments = [b'progname', 'arg1', 'arg2']
1097 else:
1098 program = 'executable'
1099 arguments = ['progname', 'arg1', 'arg2']
1100 fullpath = os.path.join(program_path, program)
1101 if os.name != "nt":
1102 native_fullpath = os.fsencode(fullpath)
1103 else:
1104 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001105 env = {'spam': 'beans'}
1106
Victor Stinnerb745a742010-05-18 17:17:23 +00001107 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001108 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001109 self.assertRaises(RuntimeError,
1110 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001111 self.assertEqual(len(calls), 1)
1112 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1113
Victor Stinnerb745a742010-05-18 17:17:23 +00001114 # test os._execvpe() with a relative path:
1115 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001116 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001117 self.assertRaises(OSError,
1118 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001119 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001120 self.assertSequenceEqual(calls[0],
1121 ('execve', native_fullpath, (arguments, env)))
1122
1123 # test os._execvpe() with a relative path:
1124 # os.get_exec_path() reads the 'PATH' variable
1125 with _execvpe_mockup() as calls:
1126 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001127 if test_type is bytes:
1128 env_path[b'PATH'] = program_path
1129 else:
1130 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001131 self.assertRaises(OSError,
1132 os._execvpe, program, arguments, env=env_path)
1133 self.assertEqual(len(calls), 1)
1134 self.assertSequenceEqual(calls[0],
1135 ('execve', native_fullpath, (arguments, env_path)))
1136
1137 def test_internal_execvpe_str(self):
1138 self._test_internal_execvpe(str)
1139 if os.name != "nt":
1140 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001141
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001142
Thomas Wouters477c8d52006-05-27 19:21:47 +00001143class Win32ErrorTests(unittest.TestCase):
1144 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001145 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001146
1147 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001148 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001149
1150 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001151 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001152
1153 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001154 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001155 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001156 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001157 finally:
1158 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001159 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001160
1161 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001162 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001163
Thomas Wouters477c8d52006-05-27 19:21:47 +00001164 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001165 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001166
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001167class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001168 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001169 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1170 #singles.append("close")
1171 #We omit close because it doesn'r raise an exception on some platforms
1172 def get_single(f):
1173 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001174 if hasattr(os, f):
1175 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001176 return helper
1177 for f in singles:
1178 locals()["test_"+f] = get_single(f)
1179
Benjamin Peterson7522c742009-01-19 21:00:09 +00001180 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001181 try:
1182 f(support.make_bad_fd(), *args)
1183 except OSError as e:
1184 self.assertEqual(e.errno, errno.EBADF)
1185 else:
1186 self.fail("%r didn't raise a OSError with a bad file descriptor"
1187 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001188
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001189 def test_isatty(self):
1190 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001191 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001192
1193 def test_closerange(self):
1194 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001195 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +00001196 # Make sure none of the descriptors we are about to close are
1197 # currently valid (issue 6542).
1198 for i in range(10):
1199 try: os.fstat(fd+i)
1200 except OSError:
1201 pass
1202 else:
1203 break
1204 if i < 2:
1205 raise unittest.SkipTest(
1206 "Unable to acquire a range of invalid file descriptors")
1207 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001208
1209 def test_dup2(self):
1210 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001211 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001212
1213 def test_fchmod(self):
1214 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001215 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001216
1217 def test_fchown(self):
1218 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001219 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001220
1221 def test_fpathconf(self):
1222 if hasattr(os, "fpathconf"):
Georg Brandl306336b2012-06-24 12:55:33 +02001223 self.check(os.pathconf, "PC_NAME_MAX")
Benjamin Peterson7522c742009-01-19 21:00:09 +00001224 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001225
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001226 def test_ftruncate(self):
1227 if hasattr(os, "ftruncate"):
Georg Brandl306336b2012-06-24 12:55:33 +02001228 self.check(os.truncate, 0)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001229 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001230
1231 def test_lseek(self):
1232 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001233 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001234
1235 def test_read(self):
1236 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001237 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001238
1239 def test_tcsetpgrpt(self):
1240 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001241 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001242
1243 def test_write(self):
1244 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001245 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001246
Brian Curtin1b9df392010-11-24 20:24:31 +00001247
1248class LinkTests(unittest.TestCase):
1249 def setUp(self):
1250 self.file1 = support.TESTFN
1251 self.file2 = os.path.join(support.TESTFN + "2")
1252
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001253 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001254 for file in (self.file1, self.file2):
1255 if os.path.exists(file):
1256 os.unlink(file)
1257
Brian Curtin1b9df392010-11-24 20:24:31 +00001258 def _test_link(self, file1, file2):
1259 with open(file1, "w") as f1:
1260 f1.write("test")
1261
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001262 with warnings.catch_warnings():
1263 warnings.simplefilter("ignore", DeprecationWarning)
1264 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001265 with open(file1, "r") as f1, open(file2, "r") as f2:
1266 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1267
1268 def test_link(self):
1269 self._test_link(self.file1, self.file2)
1270
1271 def test_link_bytes(self):
1272 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1273 bytes(self.file2, sys.getfilesystemencoding()))
1274
Brian Curtinf498b752010-11-30 15:54:04 +00001275 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001276 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001277 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001278 except UnicodeError:
1279 raise unittest.SkipTest("Unable to encode for this platform.")
1280
Brian Curtinf498b752010-11-30 15:54:04 +00001281 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001282 self.file2 = self.file1 + "2"
1283 self._test_link(self.file1, self.file2)
1284
Thomas Wouters477c8d52006-05-27 19:21:47 +00001285if sys.platform != 'win32':
1286 class Win32ErrorTests(unittest.TestCase):
1287 pass
1288
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001289 class PosixUidGidTests(unittest.TestCase):
1290 if hasattr(os, 'setuid'):
1291 def test_setuid(self):
1292 if os.getuid() != 0:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001293 self.assertRaises(OSError, os.setuid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001294 self.assertRaises(OverflowError, os.setuid, 1<<32)
1295
1296 if hasattr(os, 'setgid'):
1297 def test_setgid(self):
Stefan Krahebee49a2013-01-17 15:31:00 +01001298 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001299 self.assertRaises(OSError, os.setgid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001300 self.assertRaises(OverflowError, os.setgid, 1<<32)
1301
1302 if hasattr(os, 'seteuid'):
1303 def test_seteuid(self):
1304 if os.getuid() != 0:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001305 self.assertRaises(OSError, os.seteuid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001306 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1307
1308 if hasattr(os, 'setegid'):
1309 def test_setegid(self):
Stefan Krahebee49a2013-01-17 15:31:00 +01001310 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001311 self.assertRaises(OSError, os.setegid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001312 self.assertRaises(OverflowError, os.setegid, 1<<32)
1313
1314 if hasattr(os, 'setreuid'):
1315 def test_setreuid(self):
1316 if os.getuid() != 0:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001317 self.assertRaises(OSError, os.setreuid, 0, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001318 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1319 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001320
1321 def test_setreuid_neg1(self):
1322 # Needs to accept -1. We run this in a subprocess to avoid
1323 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001324 subprocess.check_call([
1325 sys.executable, '-c',
1326 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001327
1328 if hasattr(os, 'setregid'):
1329 def test_setregid(self):
Stefan Krahebee49a2013-01-17 15:31:00 +01001330 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001331 self.assertRaises(OSError, os.setregid, 0, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001332 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1333 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001334
1335 def test_setregid_neg1(self):
1336 # Needs to accept -1. We run this in a subprocess to avoid
1337 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001338 subprocess.check_call([
1339 sys.executable, '-c',
1340 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001341
1342 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001343 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001344 if support.TESTFN_UNENCODABLE:
1345 self.dir = support.TESTFN_UNENCODABLE
Victor Stinner8b219b22012-11-06 23:23:43 +01001346 elif support.TESTFN_NONASCII:
1347 self.dir = support.TESTFN_NONASCII
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001348 else:
1349 self.dir = support.TESTFN
1350 self.bdir = os.fsencode(self.dir)
1351
1352 bytesfn = []
1353 def add_filename(fn):
1354 try:
1355 fn = os.fsencode(fn)
1356 except UnicodeEncodeError:
1357 return
1358 bytesfn.append(fn)
1359 add_filename(support.TESTFN_UNICODE)
1360 if support.TESTFN_UNENCODABLE:
1361 add_filename(support.TESTFN_UNENCODABLE)
Victor Stinner8b219b22012-11-06 23:23:43 +01001362 if support.TESTFN_NONASCII:
1363 add_filename(support.TESTFN_NONASCII)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001364 if not bytesfn:
1365 self.skipTest("couldn't create any non-ascii filename")
1366
1367 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001368 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001369 try:
1370 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001371 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001372 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001373 if fn in self.unicodefn:
1374 raise ValueError("duplicate filename")
1375 self.unicodefn.add(fn)
1376 except:
1377 shutil.rmtree(self.dir)
1378 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001379
1380 def tearDown(self):
1381 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001382
1383 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001384 expected = self.unicodefn
1385 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001386 self.assertEqual(found, expected)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001387 # test listdir without arguments
1388 current_directory = os.getcwd()
1389 try:
1390 os.chdir(os.sep)
1391 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1392 finally:
1393 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001394
1395 def test_open(self):
1396 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001397 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001398 f.close()
1399
Victor Stinnere4110dc2013-01-01 23:05:55 +01001400 @unittest.skipUnless(hasattr(os, 'statvfs'),
1401 "need os.statvfs()")
1402 def test_statvfs(self):
1403 # issue #9645
1404 for fn in self.unicodefn:
1405 # should not fail with file not found error
1406 fullname = os.path.join(self.dir, fn)
1407 os.statvfs(fullname)
1408
Martin v. Löwis011e8422009-05-05 04:43:17 +00001409 def test_stat(self):
1410 for fn in self.unicodefn:
1411 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001412else:
1413 class PosixUidGidTests(unittest.TestCase):
1414 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001415 class Pep383Tests(unittest.TestCase):
1416 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001417
Brian Curtineb24d742010-04-12 17:16:38 +00001418@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1419class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001420 def _kill(self, sig):
1421 # Start sys.executable as a subprocess and communicate from the
1422 # subprocess to the parent that the interpreter is ready. When it
1423 # becomes ready, send *sig* via os.kill to the subprocess and check
1424 # that the return code is equal to *sig*.
1425 import ctypes
1426 from ctypes import wintypes
1427 import msvcrt
1428
1429 # Since we can't access the contents of the process' stdout until the
1430 # process has exited, use PeekNamedPipe to see what's inside stdout
1431 # without waiting. This is done so we can tell that the interpreter
1432 # is started and running at a point where it could handle a signal.
1433 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1434 PeekNamedPipe.restype = wintypes.BOOL
1435 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1436 ctypes.POINTER(ctypes.c_char), # stdout buf
1437 wintypes.DWORD, # Buffer size
1438 ctypes.POINTER(wintypes.DWORD), # bytes read
1439 ctypes.POINTER(wintypes.DWORD), # bytes avail
1440 ctypes.POINTER(wintypes.DWORD)) # bytes left
1441 msg = "running"
1442 proc = subprocess.Popen([sys.executable, "-c",
1443 "import sys;"
1444 "sys.stdout.write('{}');"
1445 "sys.stdout.flush();"
1446 "input()".format(msg)],
1447 stdout=subprocess.PIPE,
1448 stderr=subprocess.PIPE,
1449 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001450 self.addCleanup(proc.stdout.close)
1451 self.addCleanup(proc.stderr.close)
1452 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001453
1454 count, max = 0, 100
1455 while count < max and proc.poll() is None:
1456 # Create a string buffer to store the result of stdout from the pipe
1457 buf = ctypes.create_string_buffer(len(msg))
1458 # Obtain the text currently in proc.stdout
1459 # Bytes read/avail/left are left as NULL and unused
1460 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1461 buf, ctypes.sizeof(buf), None, None, None)
1462 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1463 if buf.value:
1464 self.assertEqual(msg, buf.value.decode())
1465 break
1466 time.sleep(0.1)
1467 count += 1
1468 else:
1469 self.fail("Did not receive communication from the subprocess")
1470
Brian Curtineb24d742010-04-12 17:16:38 +00001471 os.kill(proc.pid, sig)
1472 self.assertEqual(proc.wait(), sig)
1473
1474 def test_kill_sigterm(self):
1475 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001476 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001477
1478 def test_kill_int(self):
1479 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001480 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001481
1482 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001483 tagname = "test_os_%s" % uuid.uuid1()
1484 m = mmap.mmap(-1, 1, tagname)
1485 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001486 # Run a script which has console control handling enabled.
1487 proc = subprocess.Popen([sys.executable,
1488 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001489 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001490 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1491 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001492 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001493 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001494 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001495 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001496 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001497 count += 1
1498 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001499 # Forcefully kill the process if we weren't able to signal it.
1500 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001501 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001502 os.kill(proc.pid, event)
1503 # proc.send_signal(event) could also be done here.
1504 # Allow time for the signal to be passed and the process to exit.
1505 time.sleep(0.5)
1506 if not proc.poll():
1507 # Forcefully kill the process if we weren't able to signal it.
1508 os.kill(proc.pid, signal.SIGINT)
1509 self.fail("subprocess did not stop on {}".format(name))
1510
1511 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1512 def test_CTRL_C_EVENT(self):
1513 from ctypes import wintypes
1514 import ctypes
1515
1516 # Make a NULL value by creating a pointer with no argument.
1517 NULL = ctypes.POINTER(ctypes.c_int)()
1518 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1519 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1520 wintypes.BOOL)
1521 SetConsoleCtrlHandler.restype = wintypes.BOOL
1522
1523 # Calling this with NULL and FALSE causes the calling process to
1524 # handle CTRL+C, rather than ignore it. This property is inherited
1525 # by subprocesses.
1526 SetConsoleCtrlHandler(NULL, 0)
1527
1528 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1529
1530 def test_CTRL_BREAK_EVENT(self):
1531 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1532
1533
Brian Curtind40e6f72010-07-08 21:39:08 +00001534@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001535@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001536class Win32SymlinkTests(unittest.TestCase):
1537 filelink = 'filelinktest'
1538 filelink_target = os.path.abspath(__file__)
1539 dirlink = 'dirlinktest'
1540 dirlink_target = os.path.dirname(filelink_target)
1541 missing_link = 'missing link'
1542
1543 def setUp(self):
1544 assert os.path.exists(self.dirlink_target)
1545 assert os.path.exists(self.filelink_target)
1546 assert not os.path.exists(self.dirlink)
1547 assert not os.path.exists(self.filelink)
1548 assert not os.path.exists(self.missing_link)
1549
1550 def tearDown(self):
1551 if os.path.exists(self.filelink):
1552 os.remove(self.filelink)
1553 if os.path.exists(self.dirlink):
1554 os.rmdir(self.dirlink)
1555 if os.path.lexists(self.missing_link):
1556 os.remove(self.missing_link)
1557
1558 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001559 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001560 self.assertTrue(os.path.exists(self.dirlink))
1561 self.assertTrue(os.path.isdir(self.dirlink))
1562 self.assertTrue(os.path.islink(self.dirlink))
1563 self.check_stat(self.dirlink, self.dirlink_target)
1564
1565 def test_file_link(self):
1566 os.symlink(self.filelink_target, self.filelink)
1567 self.assertTrue(os.path.exists(self.filelink))
1568 self.assertTrue(os.path.isfile(self.filelink))
1569 self.assertTrue(os.path.islink(self.filelink))
1570 self.check_stat(self.filelink, self.filelink_target)
1571
1572 def _create_missing_dir_link(self):
1573 'Create a "directory" link to a non-existent target'
1574 linkname = self.missing_link
1575 if os.path.lexists(linkname):
1576 os.remove(linkname)
1577 target = r'c:\\target does not exist.29r3c740'
1578 assert not os.path.exists(target)
1579 target_is_dir = True
1580 os.symlink(target, linkname, target_is_dir)
1581
1582 def test_remove_directory_link_to_missing_target(self):
1583 self._create_missing_dir_link()
1584 # For compatibility with Unix, os.remove will check the
1585 # directory status and call RemoveDirectory if the symlink
1586 # was created with target_is_dir==True.
1587 os.remove(self.missing_link)
1588
1589 @unittest.skip("currently fails; consider for improvement")
1590 def test_isdir_on_directory_link_to_missing_target(self):
1591 self._create_missing_dir_link()
1592 # consider having isdir return true for directory links
1593 self.assertTrue(os.path.isdir(self.missing_link))
1594
1595 @unittest.skip("currently fails; consider for improvement")
1596 def test_rmdir_on_directory_link_to_missing_target(self):
1597 self._create_missing_dir_link()
1598 # consider allowing rmdir to remove directory links
1599 os.rmdir(self.missing_link)
1600
1601 def check_stat(self, link, target):
1602 self.assertEqual(os.stat(link), os.stat(target))
1603 self.assertNotEqual(os.lstat(link), os.stat(link))
1604
Brian Curtind25aef52011-06-13 15:16:04 -05001605 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001606 with warnings.catch_warnings():
1607 warnings.simplefilter("ignore", DeprecationWarning)
1608 self.assertEqual(os.stat(bytes_link), os.stat(target))
1609 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001610
1611 def test_12084(self):
1612 level1 = os.path.abspath(support.TESTFN)
1613 level2 = os.path.join(level1, "level2")
1614 level3 = os.path.join(level2, "level3")
1615 try:
1616 os.mkdir(level1)
1617 os.mkdir(level2)
1618 os.mkdir(level3)
1619
1620 file1 = os.path.abspath(os.path.join(level1, "file1"))
1621
1622 with open(file1, "w") as f:
1623 f.write("file1")
1624
1625 orig_dir = os.getcwd()
1626 try:
1627 os.chdir(level2)
1628 link = os.path.join(level2, "link")
1629 os.symlink(os.path.relpath(file1), "link")
1630 self.assertIn("link", os.listdir(os.getcwd()))
1631
1632 # Check os.stat calls from the same dir as the link
1633 self.assertEqual(os.stat(file1), os.stat("link"))
1634
1635 # Check os.stat calls from a dir below the link
1636 os.chdir(level1)
1637 self.assertEqual(os.stat(file1),
1638 os.stat(os.path.relpath(link)))
1639
1640 # Check os.stat calls from a dir above the link
1641 os.chdir(level3)
1642 self.assertEqual(os.stat(file1),
1643 os.stat(os.path.relpath(link)))
1644 finally:
1645 os.chdir(orig_dir)
1646 except OSError as err:
1647 self.fail(err)
1648 finally:
1649 os.remove(file1)
1650 shutil.rmtree(level1)
1651
Brian Curtind40e6f72010-07-08 21:39:08 +00001652
Jason R. Coombs3a092862013-05-27 23:21:28 -04001653@support.skip_unless_symlink
1654class NonLocalSymlinkTests(unittest.TestCase):
1655
1656 def setUp(self):
1657 """
1658 Create this structure:
1659
1660 base
1661 \___ some_dir
1662 """
1663 os.makedirs('base/some_dir')
1664
1665 def tearDown(self):
1666 shutil.rmtree('base')
1667
1668 def test_directory_link_nonlocal(self):
1669 """
1670 The symlink target should resolve relative to the link, not relative
1671 to the current directory.
1672
1673 Then, link base/some_link -> base/some_dir and ensure that some_link
1674 is resolved as a directory.
1675
1676 In issue13772, it was discovered that directory detection failed if
1677 the symlink target was not specified relative to the current
1678 directory, which was a defect in the implementation.
1679 """
1680 src = os.path.join('base', 'some_link')
1681 os.symlink('some_dir', src)
1682 assert os.path.isdir(src)
1683
1684
Victor Stinnere8d51452010-08-19 01:05:19 +00001685class FSEncodingTests(unittest.TestCase):
1686 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001687 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1688 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001689
Victor Stinnere8d51452010-08-19 01:05:19 +00001690 def test_identity(self):
1691 # assert fsdecode(fsencode(x)) == x
1692 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1693 try:
1694 bytesfn = os.fsencode(fn)
1695 except UnicodeEncodeError:
1696 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001697 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001698
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001699
Brett Cannonefb00c02012-02-29 18:31:31 -05001700
1701class DeviceEncodingTests(unittest.TestCase):
1702
1703 def test_bad_fd(self):
1704 # Return None when an fd doesn't actually exist.
1705 self.assertIsNone(os.device_encoding(123456))
1706
Philip Jenveye308b7c2012-02-29 16:16:15 -08001707 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1708 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001709 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001710 def test_device_encoding(self):
1711 encoding = os.device_encoding(0)
1712 self.assertIsNotNone(encoding)
1713 self.assertTrue(codecs.lookup(encoding))
1714
1715
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001716class PidTests(unittest.TestCase):
1717 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1718 def test_getppid(self):
1719 p = subprocess.Popen([sys.executable, '-c',
1720 'import os; print(os.getppid())'],
1721 stdout=subprocess.PIPE)
1722 stdout, _ = p.communicate()
1723 # We are the parent of our subprocess
1724 self.assertEqual(int(stdout), os.getpid())
1725
1726
Brian Curtin0151b8e2010-09-24 13:43:43 +00001727# The introduction of this TestCase caused at least two different errors on
1728# *nix buildbots. Temporarily skip this to let the buildbots move along.
1729@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001730@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1731class LoginTests(unittest.TestCase):
1732 def test_getlogin(self):
1733 user_name = os.getlogin()
1734 self.assertNotEqual(len(user_name), 0)
1735
1736
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001737@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1738 "needs os.getpriority and os.setpriority")
1739class ProgramPriorityTests(unittest.TestCase):
1740 """Tests for os.getpriority() and os.setpriority()."""
1741
1742 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001743
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001744 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1745 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1746 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001747 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1748 if base >= 19 and new_prio <= 19:
1749 raise unittest.SkipTest(
1750 "unable to reliably test setpriority at current nice level of %s" % base)
1751 else:
1752 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001753 finally:
1754 try:
1755 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1756 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001757 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001758 raise
1759
1760
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001761if threading is not None:
1762 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001763
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001764 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001765
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001766 def __init__(self, conn):
1767 asynchat.async_chat.__init__(self, conn)
1768 self.in_buffer = []
1769 self.closed = False
1770 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001771
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001772 def handle_read(self):
1773 data = self.recv(4096)
1774 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001775
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001776 def get_data(self):
1777 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001778
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001779 def handle_close(self):
1780 self.close()
1781 self.closed = True
1782
1783 def handle_error(self):
1784 raise
1785
1786 def __init__(self, address):
1787 threading.Thread.__init__(self)
1788 asyncore.dispatcher.__init__(self)
1789 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1790 self.bind(address)
1791 self.listen(5)
1792 self.host, self.port = self.socket.getsockname()[:2]
1793 self.handler_instance = None
1794 self._active = False
1795 self._active_lock = threading.Lock()
1796
1797 # --- public API
1798
1799 @property
1800 def running(self):
1801 return self._active
1802
1803 def start(self):
1804 assert not self.running
1805 self.__flag = threading.Event()
1806 threading.Thread.start(self)
1807 self.__flag.wait()
1808
1809 def stop(self):
1810 assert self.running
1811 self._active = False
1812 self.join()
1813
1814 def wait(self):
1815 # wait for handler connection to be closed, then stop the server
1816 while not getattr(self.handler_instance, "closed", False):
1817 time.sleep(0.001)
1818 self.stop()
1819
1820 # --- internals
1821
1822 def run(self):
1823 self._active = True
1824 self.__flag.set()
1825 while self._active and asyncore.socket_map:
1826 self._active_lock.acquire()
1827 asyncore.loop(timeout=0.001, count=1)
1828 self._active_lock.release()
1829 asyncore.close_all()
1830
1831 def handle_accept(self):
1832 conn, addr = self.accept()
1833 self.handler_instance = self.Handler(conn)
1834
1835 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001836 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001837 handle_read = handle_connect
1838
1839 def writable(self):
1840 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001841
1842 def handle_error(self):
1843 raise
1844
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001845
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001846@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001847@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1848class TestSendfile(unittest.TestCase):
1849
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001850 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001851 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001852 not sys.platform.startswith("solaris") and \
1853 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001854
1855 @classmethod
1856 def setUpClass(cls):
1857 with open(support.TESTFN, "wb") as f:
1858 f.write(cls.DATA)
1859
1860 @classmethod
1861 def tearDownClass(cls):
1862 support.unlink(support.TESTFN)
1863
1864 def setUp(self):
1865 self.server = SendfileTestServer((support.HOST, 0))
1866 self.server.start()
1867 self.client = socket.socket()
1868 self.client.connect((self.server.host, self.server.port))
1869 self.client.settimeout(1)
1870 # synchronize by waiting for "220 ready" response
1871 self.client.recv(1024)
1872 self.sockno = self.client.fileno()
1873 self.file = open(support.TESTFN, 'rb')
1874 self.fileno = self.file.fileno()
1875
1876 def tearDown(self):
1877 self.file.close()
1878 self.client.close()
1879 if self.server.running:
1880 self.server.stop()
1881
1882 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1883 """A higher level wrapper representing how an application is
1884 supposed to use sendfile().
1885 """
1886 while 1:
1887 try:
1888 if self.SUPPORT_HEADERS_TRAILERS:
1889 return os.sendfile(sock, file, offset, nbytes, headers,
1890 trailers)
1891 else:
1892 return os.sendfile(sock, file, offset, nbytes)
1893 except OSError as err:
1894 if err.errno == errno.ECONNRESET:
1895 # disconnected
1896 raise
1897 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1898 # we have to retry send data
1899 continue
1900 else:
1901 raise
1902
1903 def test_send_whole_file(self):
1904 # normal send
1905 total_sent = 0
1906 offset = 0
1907 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001908 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001909 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1910 if sent == 0:
1911 break
1912 offset += sent
1913 total_sent += sent
1914 self.assertTrue(sent <= nbytes)
1915 self.assertEqual(offset, total_sent)
1916
1917 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001918 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001919 self.client.close()
1920 self.server.wait()
1921 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001922 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001923 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001924
1925 def test_send_at_certain_offset(self):
1926 # start sending a file at a certain offset
1927 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001928 offset = len(self.DATA) // 2
1929 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001930 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001931 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001932 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1933 if sent == 0:
1934 break
1935 offset += sent
1936 total_sent += sent
1937 self.assertTrue(sent <= nbytes)
1938
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001939 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001940 self.client.close()
1941 self.server.wait()
1942 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001943 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001944 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001945 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001946 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001947
1948 def test_offset_overflow(self):
1949 # specify an offset > file size
1950 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001951 try:
1952 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1953 except OSError as e:
1954 # Solaris can raise EINVAL if offset >= file length, ignore.
1955 if e.errno != errno.EINVAL:
1956 raise
1957 else:
1958 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001959 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001960 self.client.close()
1961 self.server.wait()
1962 data = self.server.handler_instance.get_data()
1963 self.assertEqual(data, b'')
1964
1965 def test_invalid_offset(self):
1966 with self.assertRaises(OSError) as cm:
1967 os.sendfile(self.sockno, self.fileno, -1, 4096)
1968 self.assertEqual(cm.exception.errno, errno.EINVAL)
1969
1970 # --- headers / trailers tests
1971
1972 if SUPPORT_HEADERS_TRAILERS:
1973
1974 def test_headers(self):
1975 total_sent = 0
1976 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1977 headers=[b"x" * 512])
1978 total_sent += sent
1979 offset = 4096
1980 nbytes = 4096
1981 while 1:
1982 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1983 offset, nbytes)
1984 if sent == 0:
1985 break
1986 total_sent += sent
1987 offset += sent
1988
1989 expected_data = b"x" * 512 + self.DATA
1990 self.assertEqual(total_sent, len(expected_data))
1991 self.client.close()
1992 self.server.wait()
1993 data = self.server.handler_instance.get_data()
1994 self.assertEqual(hash(data), hash(expected_data))
1995
1996 def test_trailers(self):
1997 TESTFN2 = support.TESTFN + "2"
Victor Stinner5e4d6392013-08-15 11:57:02 +02001998 file_data = b"abcdef"
Brett Cannonb6376802011-03-15 17:38:22 -04001999 with open(TESTFN2, 'wb') as f:
Victor Stinner5e4d6392013-08-15 11:57:02 +02002000 f.write(file_data)
Brett Cannonb6376802011-03-15 17:38:22 -04002001 with open(TESTFN2, 'rb')as f:
2002 self.addCleanup(os.remove, TESTFN2)
Victor Stinner5e4d6392013-08-15 11:57:02 +02002003 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2004 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002005 self.client.close()
2006 self.server.wait()
2007 data = self.server.handler_instance.get_data()
Victor Stinner5e4d6392013-08-15 11:57:02 +02002008 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002009
2010 if hasattr(os, "SF_NODISKIO"):
2011 def test_flags(self):
2012 try:
2013 os.sendfile(self.sockno, self.fileno, 0, 4096,
2014 flags=os.SF_NODISKIO)
2015 except OSError as err:
2016 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2017 raise
2018
2019
Larry Hastings9cf065c2012-06-22 16:30:09 -07002020def supports_extended_attributes():
2021 if not hasattr(os, "setxattr"):
2022 return False
2023 try:
2024 with open(support.TESTFN, "wb") as fp:
2025 try:
2026 os.setxattr(fp.fileno(), b"user.test", b"")
2027 except OSError:
2028 return False
2029 finally:
2030 support.unlink(support.TESTFN)
2031 # Kernels < 2.6.39 don't respect setxattr flags.
2032 kernel_version = platform.release()
2033 m = re.match("2.6.(\d{1,2})", kernel_version)
2034 return m is None or int(m.group(1)) >= 39
2035
2036
2037@unittest.skipUnless(supports_extended_attributes(),
2038 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002039class ExtendedAttributeTests(unittest.TestCase):
2040
2041 def tearDown(self):
2042 support.unlink(support.TESTFN)
2043
Larry Hastings9cf065c2012-06-22 16:30:09 -07002044 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002045 fn = support.TESTFN
2046 open(fn, "wb").close()
2047 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002048 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002049 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002050 init_xattr = listxattr(fn)
2051 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002052 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002053 xattr = set(init_xattr)
2054 xattr.add("user.test")
2055 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002056 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2057 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2058 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002059 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002060 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002061 self.assertEqual(cm.exception.errno, errno.EEXIST)
2062 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002063 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002064 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002065 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002066 xattr.add("user.test2")
2067 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002068 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002069 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002070 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002071 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002072 xattr.remove("user.test")
2073 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002074 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2075 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2076 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2077 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002078 many = sorted("user.test{}".format(i) for i in range(100))
2079 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002080 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002081 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002082
Larry Hastings9cf065c2012-06-22 16:30:09 -07002083 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002084 def make_bytes(s):
2085 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002086 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002087 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002088 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002089
2090 def test_simple(self):
2091 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2092 os.listxattr)
2093
2094 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002095 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2096 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002097
2098 def test_fds(self):
2099 def getxattr(path, *args):
2100 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002101 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002102 def setxattr(path, *args):
2103 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002104 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002105 def removexattr(path, *args):
2106 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002107 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002108 def listxattr(path, *args):
2109 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002110 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002111 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2112
2113
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002114@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2115class Win32DeprecatedBytesAPI(unittest.TestCase):
2116 def test_deprecated(self):
2117 import nt
2118 filename = os.fsencode(support.TESTFN)
2119 with warnings.catch_warnings():
2120 warnings.simplefilter("error", DeprecationWarning)
2121 for func, *args in (
2122 (nt._getfullpathname, filename),
2123 (nt._isdir, filename),
2124 (os.access, filename, os.R_OK),
2125 (os.chdir, filename),
2126 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002127 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002128 (os.link, filename, filename),
2129 (os.listdir, filename),
2130 (os.lstat, filename),
2131 (os.mkdir, filename),
2132 (os.open, filename, os.O_RDONLY),
2133 (os.rename, filename, filename),
2134 (os.rmdir, filename),
2135 (os.startfile, filename),
2136 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002137 (os.unlink, filename),
2138 (os.utime, filename),
2139 ):
2140 self.assertRaises(DeprecationWarning, func, *args)
2141
Victor Stinner28216442011-11-16 00:34:44 +01002142 @support.skip_unless_symlink
2143 def test_symlink(self):
2144 filename = os.fsencode(support.TESTFN)
2145 with warnings.catch_warnings():
2146 warnings.simplefilter("error", DeprecationWarning)
2147 self.assertRaises(DeprecationWarning,
2148 os.symlink, filename, filename)
2149
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002150
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002151@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2152class TermsizeTests(unittest.TestCase):
2153 def test_does_not_crash(self):
2154 """Check if get_terminal_size() returns a meaningful value.
2155
2156 There's no easy portable way to actually check the size of the
2157 terminal, so let's check if it returns something sensible instead.
2158 """
2159 try:
2160 size = os.get_terminal_size()
2161 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002162 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002163 # Under win32 a generic OSError can be thrown if the
2164 # handle cannot be retrieved
2165 self.skipTest("failed to query terminal size")
2166 raise
2167
Antoine Pitroucfade362012-02-08 23:48:59 +01002168 self.assertGreaterEqual(size.columns, 0)
2169 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002170
2171 def test_stty_match(self):
2172 """Check if stty returns the same results
2173
2174 stty actually tests stdin, so get_terminal_size is invoked on
2175 stdin explicitly. If stty succeeded, then get_terminal_size()
2176 should work too.
2177 """
2178 try:
2179 size = subprocess.check_output(['stty', 'size']).decode().split()
2180 except (FileNotFoundError, subprocess.CalledProcessError):
2181 self.skipTest("stty invocation failed")
2182 expected = (int(size[1]), int(size[0])) # reversed order
2183
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002184 try:
2185 actual = os.get_terminal_size(sys.__stdin__.fileno())
2186 except OSError as e:
2187 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2188 # Under win32 a generic OSError can be thrown if the
2189 # handle cannot be retrieved
2190 self.skipTest("failed to query terminal size")
2191 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002192 self.assertEqual(expected, actual)
2193
2194
Victor Stinner292c8352012-10-30 02:17:38 +01002195class OSErrorTests(unittest.TestCase):
2196 def setUp(self):
2197 class Str(str):
2198 pass
2199
Victor Stinnerafe17062012-10-31 22:47:43 +01002200 self.bytes_filenames = []
2201 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002202 if support.TESTFN_UNENCODABLE is not None:
2203 decoded = support.TESTFN_UNENCODABLE
2204 else:
2205 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002206 self.unicode_filenames.append(decoded)
2207 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002208 if support.TESTFN_UNDECODABLE is not None:
2209 encoded = support.TESTFN_UNDECODABLE
2210 else:
2211 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002212 self.bytes_filenames.append(encoded)
2213 self.bytes_filenames.append(memoryview(encoded))
2214
2215 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002216
2217 def test_oserror_filename(self):
2218 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002219 (self.filenames, os.chdir,),
2220 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002221 (self.filenames, os.lstat,),
2222 (self.filenames, os.open, os.O_RDONLY),
2223 (self.filenames, os.rmdir,),
2224 (self.filenames, os.stat,),
2225 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002226 ]
2227 if sys.platform == "win32":
2228 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002229 (self.bytes_filenames, os.rename, b"dst"),
2230 (self.bytes_filenames, os.replace, b"dst"),
2231 (self.unicode_filenames, os.rename, "dst"),
2232 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002233 # Issue #16414: Don't test undecodable names with listdir()
2234 # because of a Windows bug.
2235 #
2236 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2237 # empty list (instead of failing), whereas os.listdir(b'\xff')
2238 # raises a FileNotFoundError. It looks like a Windows bug:
2239 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2240 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2241 # ERROR_PATH_NOT_FOUND (3).
2242 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002243 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002244 else:
2245 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002246 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002247 (self.filenames, os.rename, "dst"),
2248 (self.filenames, os.replace, "dst"),
2249 ))
2250 if hasattr(os, "chown"):
2251 funcs.append((self.filenames, os.chown, 0, 0))
2252 if hasattr(os, "lchown"):
2253 funcs.append((self.filenames, os.lchown, 0, 0))
2254 if hasattr(os, "truncate"):
2255 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002256 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002257 funcs.append((self.filenames, os.chflags, 0))
2258 if hasattr(os, "lchflags"):
2259 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002260 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002261 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002262 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002263 if sys.platform == "win32":
2264 funcs.append((self.bytes_filenames, os.link, b"dst"))
2265 funcs.append((self.unicode_filenames, os.link, "dst"))
2266 else:
2267 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002268 if hasattr(os, "listxattr"):
2269 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002270 (self.filenames, os.listxattr,),
2271 (self.filenames, os.getxattr, "user.test"),
2272 (self.filenames, os.setxattr, "user.test", b'user'),
2273 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002274 ))
2275 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002276 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002277 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002278 if sys.platform == "win32":
2279 funcs.append((self.unicode_filenames, os.readlink,))
2280 else:
2281 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002282
Victor Stinnerafe17062012-10-31 22:47:43 +01002283 for filenames, func, *func_args in funcs:
2284 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002285 try:
2286 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002287 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002288 self.assertIs(err.filename, name)
2289 else:
2290 self.fail("No exception thrown by {}".format(func))
2291
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002292class CPUCountTests(unittest.TestCase):
2293 def test_cpu_count(self):
2294 cpus = os.cpu_count()
2295 if cpus is not None:
2296 self.assertIsInstance(cpus, int)
2297 self.assertGreater(cpus, 0)
2298 else:
2299 self.skipTest("Could not determine the number of CPUs")
2300
Victor Stinnerdaf45552013-08-28 00:53:59 +02002301
2302class FDInheritanceTests(unittest.TestCase):
2303 def test_get_inheritable(self):
2304 fd = os.open(__file__, os.O_RDONLY)
2305 self.addCleanup(os.close, fd)
2306 for inheritable in (False, True):
2307 os.set_inheritable(fd, inheritable)
2308 self.assertEqual(os.get_inheritable(fd), inheritable)
2309
2310 def test_set_inheritable(self):
2311 fd = os.open(__file__, os.O_RDONLY)
2312 self.addCleanup(os.close, fd)
2313 os.set_inheritable(fd, True)
2314 self.assertEqual(os.get_inheritable(fd), True)
2315
2316 def test_open(self):
2317 fd = os.open(__file__, os.O_RDONLY)
2318 self.addCleanup(os.close, fd)
2319 self.assertEqual(os.get_inheritable(fd), False)
2320
2321 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2322 def test_pipe(self):
2323 rfd, wfd = os.pipe()
2324 self.addCleanup(os.close, rfd)
2325 self.addCleanup(os.close, wfd)
2326 self.assertEqual(os.get_inheritable(rfd), False)
2327 self.assertEqual(os.get_inheritable(wfd), False)
2328
2329 def test_dup(self):
2330 fd1 = os.open(__file__, os.O_RDONLY)
2331 self.addCleanup(os.close, fd1)
2332
2333 fd2 = os.dup(fd1)
2334 self.addCleanup(os.close, fd2)
2335 self.assertEqual(os.get_inheritable(fd2), False)
2336
2337 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2338 def test_dup2(self):
2339 fd = os.open(__file__, os.O_RDONLY)
2340 self.addCleanup(os.close, fd)
2341
2342 # inheritable by default
2343 fd2 = os.open(__file__, os.O_RDONLY)
2344 try:
2345 os.dup2(fd, fd2)
2346 self.assertEqual(os.get_inheritable(fd2), True)
2347 finally:
2348 os.close(fd2)
2349
2350 # force non-inheritable
2351 fd3 = os.open(__file__, os.O_RDONLY)
2352 try:
2353 os.dup2(fd, fd3, inheritable=False)
2354 self.assertEqual(os.get_inheritable(fd3), False)
2355 finally:
2356 os.close(fd3)
2357
2358 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2359 def test_openpty(self):
2360 master_fd, slave_fd = os.openpty()
2361 self.addCleanup(os.close, master_fd)
2362 self.addCleanup(os.close, slave_fd)
2363 self.assertEqual(os.get_inheritable(master_fd), False)
2364 self.assertEqual(os.get_inheritable(slave_fd), False)
2365
2366
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002367@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002368def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002369 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002370 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002371 StatAttributeTests,
2372 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002373 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002374 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002375 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002376 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002377 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002378 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002379 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002380 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002381 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002382 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002383 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002384 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002385 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002386 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002387 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002388 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002389 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002390 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002391 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002392 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002393 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002394 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002395 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002396 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002397 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002398 CPUCountTests,
Victor Stinnerdaf45552013-08-28 00:53:59 +02002399 FDInheritanceTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002400 )
Fred Drake2e2be372001-09-20 21:33:42 +00002401
2402if __name__ == "__main__":
2403 test_main()