blob: 39b0e80125b9edfeec4bea9ad1efdf8b421f40da [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Larry Hastingsa27b83a2013-08-08 00:19:50 -070027import decimal
28import fractions
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000029try:
30 import threading
31except ImportError:
32 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020033try:
34 import resource
35except ImportError:
36 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020037try:
38 import fcntl
39except ImportError:
40 fcntl = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020041
Georg Brandl2daf6ae2012-02-20 19:54:16 +010042from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000043
Victor Stinner034d0aa2012-06-05 01:22:15 +020044with warnings.catch_warnings():
45 warnings.simplefilter("ignore", DeprecationWarning)
46 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010047st = os.stat(__file__)
48stat_supports_subsecond = (
49 # check if float and int timestamps are different
50 (st.st_atime != st[7])
51 or (st.st_mtime != st[8])
52 or (st.st_ctime != st[9]))
53
Mark Dickinson7cf03892010-04-16 13:45:35 +000054# Detect whether we're on a Linux system that uses the (now outdated
55# and unmaintained) linuxthreads threading library. There's an issue
56# when combining linuxthreads with a failed execv call: see
57# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020058if hasattr(sys, 'thread_info') and sys.thread_info.version:
59 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
60else:
61 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000062
Stefan Krahebee49a2013-01-17 15:31:00 +010063# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
64HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
65
Thomas Wouters0e3f5912006-08-11 14:57:12 +000066# Tests creating TESTFN
67class FileTests(unittest.TestCase):
68 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000069 if os.path.exists(support.TESTFN):
70 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000071 tearDown = setUp
72
73 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000074 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000075 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000076 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000077
Christian Heimesfdab48e2008-01-20 09:06:41 +000078 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000079 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
80 # We must allocate two consecutive file descriptors, otherwise
81 # it will mess up other file descriptors (perhaps even the three
82 # standard ones).
83 second = os.dup(first)
84 try:
85 retries = 0
86 while second != first + 1:
87 os.close(first)
88 retries += 1
89 if retries > 10:
90 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000091 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000092 first, second = second, os.dup(second)
93 finally:
94 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000095 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000096 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000097 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000098
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000099 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000100 def test_rename(self):
101 path = support.TESTFN
102 old = sys.getrefcount(path)
103 self.assertRaises(TypeError, os.rename, path, 0)
104 new = sys.getrefcount(path)
105 self.assertEqual(old, new)
106
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000107 def test_read(self):
108 with open(support.TESTFN, "w+b") as fobj:
109 fobj.write(b"spam")
110 fobj.flush()
111 fd = fobj.fileno()
112 os.lseek(fd, 0, 0)
113 s = os.read(fd, 4)
114 self.assertEqual(type(s), bytes)
115 self.assertEqual(s, b"spam")
116
117 def test_write(self):
118 # os.write() accepts bytes- and buffer-like objects but not strings
119 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
120 self.assertRaises(TypeError, os.write, fd, "beans")
121 os.write(fd, b"bacon\n")
122 os.write(fd, bytearray(b"eggs\n"))
123 os.write(fd, memoryview(b"spam\n"))
124 os.close(fd)
125 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000126 self.assertEqual(fobj.read().splitlines(),
127 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000128
Victor Stinnere0daff12011-03-20 23:36:35 +0100129 def write_windows_console(self, *args):
130 retcode = subprocess.call(args,
131 # use a new console to not flood the test output
132 creationflags=subprocess.CREATE_NEW_CONSOLE,
133 # use a shell to hide the console window (SW_HIDE)
134 shell=True)
135 self.assertEqual(retcode, 0)
136
137 @unittest.skipUnless(sys.platform == 'win32',
138 'test specific to the Windows console')
139 def test_write_windows_console(self):
140 # Issue #11395: the Windows console returns an error (12: not enough
141 # space error) on writing into stdout if stdout mode is binary and the
142 # length is greater than 66,000 bytes (or less, depending on heap
143 # usage).
144 code = "print('x' * 100000)"
145 self.write_windows_console(sys.executable, "-c", code)
146 self.write_windows_console(sys.executable, "-u", "-c", code)
147
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000148 def fdopen_helper(self, *args):
149 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200150 f = os.fdopen(fd, *args)
151 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000152
153 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200154 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
155 os.close(fd)
156
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000157 self.fdopen_helper()
158 self.fdopen_helper('r')
159 self.fdopen_helper('r', 100)
160
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100161 def test_replace(self):
162 TESTFN2 = support.TESTFN + ".2"
163 with open(support.TESTFN, 'w') as f:
164 f.write("1")
165 with open(TESTFN2, 'w') as f:
166 f.write("2")
167 self.addCleanup(os.unlink, TESTFN2)
168 os.replace(support.TESTFN, TESTFN2)
169 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
170 with open(TESTFN2, 'r') as f:
171 self.assertEqual(f.read(), "1")
172
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200173
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000174# Test attributes on return values from os.*stat* family.
175class StatAttributeTests(unittest.TestCase):
176 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000177 os.mkdir(support.TESTFN)
178 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000179 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000180 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000182
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000183 def tearDown(self):
184 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000185 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000186
Antoine Pitrou38425292010-09-21 18:19:07 +0000187 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000188 if not hasattr(os, "stat"):
189 return
190
Antoine Pitrou38425292010-09-21 18:19:07 +0000191 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000192
193 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000194 self.assertEqual(result[stat.ST_SIZE], 3)
195 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000196
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000197 # Make sure all the attributes are there
198 members = dir(result)
199 for name in dir(stat):
200 if name[:3] == 'ST_':
201 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000202 if name.endswith("TIME"):
203 def trunc(x): return int(x)
204 else:
205 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000206 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000207 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000208 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000209
Larry Hastings6fe20b32012-04-19 15:07:49 -0700210 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700211 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700212 for name in 'st_atime st_mtime st_ctime'.split():
213 floaty = int(getattr(result, name) * 100000)
214 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700215 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700216
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000217 try:
218 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200219 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 except IndexError:
221 pass
222
223 # Make sure that assignment fails
224 try:
225 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200226 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000227 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000228 pass
229
230 try:
231 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200232 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000233 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234 pass
235
236 try:
237 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200238 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000239 except AttributeError:
240 pass
241
242 # Use the stat_result constructor with a too-short tuple.
243 try:
244 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200245 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246 except TypeError:
247 pass
248
Ezio Melotti42da6632011-03-15 05:18:48 +0200249 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000250 try:
251 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
252 except TypeError:
253 pass
254
Antoine Pitrou38425292010-09-21 18:19:07 +0000255 def test_stat_attributes(self):
256 self.check_stat_attributes(self.fname)
257
258 def test_stat_attributes_bytes(self):
259 try:
260 fname = self.fname.encode(sys.getfilesystemencoding())
261 except UnicodeEncodeError:
262 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100263 with warnings.catch_warnings():
264 warnings.simplefilter("ignore", DeprecationWarning)
265 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000266
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267 def test_statvfs_attributes(self):
268 if not hasattr(os, "statvfs"):
269 return
270
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000271 try:
272 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000273 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000274 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000275 if e.errno == errno.ENOSYS:
276 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000277
278 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000279 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000280
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000281 # Make sure all the attributes are there.
282 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
283 'ffree', 'favail', 'flag', 'namemax')
284 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000285 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000286
287 # Make sure that assignment really fails
288 try:
289 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200290 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000291 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 pass
293
294 try:
295 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200296 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000297 except AttributeError:
298 pass
299
300 # Use the constructor with a too-short tuple.
301 try:
302 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200303 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000304 except TypeError:
305 pass
306
Ezio Melotti42da6632011-03-15 05:18:48 +0200307 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000308 try:
309 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
310 except TypeError:
311 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000312
Thomas Wouters89f507f2006-12-13 04:49:30 +0000313 def test_utime_dir(self):
314 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000315 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000316 # round to int, because some systems may support sub-second
317 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000318 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
319 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000320 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000321
Larry Hastings76ad59b2012-05-03 00:30:07 -0700322 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600323 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600324 # second argument. Check that the previous methods of passing
325 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700326 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600327 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700328 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
329 # Setting the time to the time you just read, then reading again,
330 # should always return exactly the same times.
331 st1 = os.stat(filename)
332 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
333 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600334 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700335 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600336 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700337 # Set to the current time in the new way
338 os.utime(filename)
339 st3 = os.stat(filename)
340 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
341
342 def test_utime(self):
343 def utime(file, times):
344 return os.utime(file, times)
345 self._test_utime(self.fname, getattr, utime, 10)
346 self._test_utime(support.TESTFN, getattr, utime, 10)
347
348
349 def _test_utime_ns(self, set_times_ns, test_dir=True):
350 def getattr_ns(o, attr):
351 return getattr(o, attr + "_ns")
352 ten_s = 10 * 1000 * 1000 * 1000
353 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
354 if test_dir:
355 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
356
357 def test_utime_ns(self):
358 def utime_ns(file, times):
359 return os.utime(file, ns=times)
360 self._test_utime_ns(utime_ns)
361
Larry Hastings9cf065c2012-06-22 16:30:09 -0700362 requires_utime_dir_fd = unittest.skipUnless(
363 os.utime in os.supports_dir_fd,
364 "dir_fd support for utime required for this test.")
365 requires_utime_fd = unittest.skipUnless(
366 os.utime in os.supports_fd,
367 "fd support for utime required for this test.")
368 requires_utime_nofollow_symlinks = unittest.skipUnless(
369 os.utime in os.supports_follow_symlinks,
370 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700371
Larry Hastings9cf065c2012-06-22 16:30:09 -0700372 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700373 def test_lutimes_ns(self):
374 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700375 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700376 self._test_utime_ns(lutimes_ns)
377
Larry Hastings9cf065c2012-06-22 16:30:09 -0700378 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700379 def test_futimes_ns(self):
380 def futimes_ns(file, times):
381 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700382 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700383 self._test_utime_ns(futimes_ns, test_dir=False)
384
385 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700386 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700387 getattr(os, name)(arg, (5, 5), ns=(5, 5))
388
389 def test_utime_invalid_arguments(self):
390 self._utime_invalid_arguments('utime', self.fname)
391
Brian Curtin52fbea12011-11-06 13:41:17 -0600392
Victor Stinner1aa54a42012-02-08 04:09:37 +0100393 @unittest.skipUnless(stat_supports_subsecond,
394 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100395 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100396 asec, amsec = 1, 901
397 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100398 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100399 mtime = msec + mmsec * 1e-3
400 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100401 os.utime(filename, (0, 0))
402 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200403 with warnings.catch_warnings():
404 warnings.simplefilter("ignore", DeprecationWarning)
405 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100406 st = os.stat(filename)
407 self.assertAlmostEqual(st.st_atime, atime, places=3)
408 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100409
Victor Stinnera2f7c002012-02-08 03:36:25 +0100410 def test_utime_subsecond(self):
411 def set_time(filename, atime, mtime):
412 os.utime(filename, (atime, mtime))
413 self._test_utime_subsecond(set_time)
414
Larry Hastings9cf065c2012-06-22 16:30:09 -0700415 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100416 def test_futimes_subsecond(self):
417 def set_time(filename, atime, mtime):
418 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700419 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100420 self._test_utime_subsecond(set_time)
421
Larry Hastings9cf065c2012-06-22 16:30:09 -0700422 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100423 def test_futimens_subsecond(self):
424 def set_time(filename, atime, mtime):
425 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700426 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100427 self._test_utime_subsecond(set_time)
428
Larry Hastings9cf065c2012-06-22 16:30:09 -0700429 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100430 def test_futimesat_subsecond(self):
431 def set_time(filename, atime, mtime):
432 dirname = os.path.dirname(filename)
433 dirfd = os.open(dirname, os.O_RDONLY)
434 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700435 os.utime(os.path.basename(filename), dir_fd=dirfd,
436 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100437 finally:
438 os.close(dirfd)
439 self._test_utime_subsecond(set_time)
440
Larry Hastings9cf065c2012-06-22 16:30:09 -0700441 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100442 def test_lutimes_subsecond(self):
443 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700444 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100445 self._test_utime_subsecond(set_time)
446
Larry Hastings9cf065c2012-06-22 16:30:09 -0700447 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100448 def test_utimensat_subsecond(self):
449 def set_time(filename, atime, mtime):
450 dirname = os.path.dirname(filename)
451 dirfd = os.open(dirname, os.O_RDONLY)
452 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700453 os.utime(os.path.basename(filename), dir_fd=dirfd,
454 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100455 finally:
456 os.close(dirfd)
457 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100458
Thomas Wouters89f507f2006-12-13 04:49:30 +0000459 # Restrict test to Win32, since there is no guarantee other
460 # systems support centiseconds
461 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000462 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000463 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000464 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000465 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000466 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000467 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000468 return buf.value
469
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000470 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000471 def test_1565150(self):
472 t1 = 1159195039.25
473 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000474 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000475
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000476 def test_large_time(self):
477 t1 = 5000000000 # some day in 2128
478 os.utime(self.fname, (t1, t1))
479 self.assertEqual(os.stat(self.fname).st_mtime, t1)
480
Guido van Rossumd8faa362007-04-27 19:54:29 +0000481 def test_1686475(self):
482 # Verify that an open file can be stat'ed
483 try:
484 os.stat(r"c:\pagefile.sys")
Andrew Svetlov2606a6f2012-12-19 14:33:35 +0200485 except FileNotFoundError:
486 pass # file does not exist; cannot run test
487 except OSError as e:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000488 self.fail("Could not stat pagefile.sys")
489
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100490 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
491 def test_15261(self):
492 # Verify that stat'ing a closed fd does not cause crash
493 r, w = os.pipe()
494 try:
495 os.stat(r) # should not raise error
496 finally:
497 os.close(r)
498 os.close(w)
499 with self.assertRaises(OSError) as ctx:
500 os.stat(r)
501 self.assertEqual(ctx.exception.errno, errno.EBADF)
502
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000503from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000504
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000505class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000506 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000507 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000508
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000509 def setUp(self):
510 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000511 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000512 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000513 for key, value in self._reference().items():
514 os.environ[key] = value
515
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000516 def tearDown(self):
517 os.environ.clear()
518 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000519 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000520 os.environb.clear()
521 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000522
Christian Heimes90333392007-11-01 19:08:42 +0000523 def _reference(self):
524 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
525
526 def _empty_mapping(self):
527 os.environ.clear()
528 return os.environ
529
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000530 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300531 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000532 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000533 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300534 os.environ.update(HELLO="World")
535 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
536 value = popen.read().strip()
537 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000538
Ezio Melottic7e139b2012-09-26 20:01:34 +0300539 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000540 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300541 with os.popen(
542 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
543 it = iter(popen)
544 self.assertEqual(next(it), "line1\n")
545 self.assertEqual(next(it), "line2\n")
546 self.assertEqual(next(it), "line3\n")
547 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000548
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000549 # Verify environ keys and values from the OS are of the
550 # correct str type.
551 def test_keyvalue_types(self):
552 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000553 self.assertEqual(type(key), str)
554 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000555
Christian Heimes90333392007-11-01 19:08:42 +0000556 def test_items(self):
557 for key, value in self._reference().items():
558 self.assertEqual(os.environ.get(key), value)
559
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000560 # Issue 7310
561 def test___repr__(self):
562 """Check that the repr() of os.environ looks like environ({...})."""
563 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000564 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
565 '{!r}: {!r}'.format(key, value)
566 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000567
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000568 def test_get_exec_path(self):
569 defpath_list = os.defpath.split(os.pathsep)
570 test_path = ['/monty', '/python', '', '/flying/circus']
571 test_env = {'PATH': os.pathsep.join(test_path)}
572
573 saved_environ = os.environ
574 try:
575 os.environ = dict(test_env)
576 # Test that defaulting to os.environ works.
577 self.assertSequenceEqual(test_path, os.get_exec_path())
578 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
579 finally:
580 os.environ = saved_environ
581
582 # No PATH environment variable
583 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
584 # Empty PATH environment variable
585 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
586 # Supplied PATH environment variable
587 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
588
Victor Stinnerb745a742010-05-18 17:17:23 +0000589 if os.supports_bytes_environ:
590 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000591 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000592 # ignore BytesWarning warning
593 with warnings.catch_warnings(record=True):
594 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000595 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000596 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000597 pass
598 else:
599 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000600
601 # bytes key and/or value
602 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
603 ['abc'])
604 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
605 ['abc'])
606 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
607 ['abc'])
608
609 @unittest.skipUnless(os.supports_bytes_environ,
610 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000611 def test_environb(self):
612 # os.environ -> os.environb
613 value = 'euro\u20ac'
614 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000615 value_bytes = value.encode(sys.getfilesystemencoding(),
616 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000617 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000618 msg = "U+20AC character is not encodable to %s" % (
619 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000620 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000621 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000622 self.assertEqual(os.environ['unicode'], value)
623 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000624
625 # os.environb -> os.environ
626 value = b'\xff'
627 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000628 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000629 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000630 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000631
Charles-François Natali2966f102011-11-26 11:32:46 +0100632 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
633 # #13415).
634 @support.requires_freebsd_version(7)
635 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100636 def test_unset_error(self):
637 if sys.platform == "win32":
638 # an environment variable is limited to 32,767 characters
639 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100640 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100641 else:
642 # "=" is not allowed in a variable name
643 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100644 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100645
Victor Stinner6d101392013-04-14 16:35:04 +0200646 def test_key_type(self):
647 missing = 'missingkey'
648 self.assertNotIn(missing, os.environ)
649
Victor Stinner839e5ea2013-04-14 16:43:03 +0200650 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200651 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200652 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200653 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200654
Victor Stinner839e5ea2013-04-14 16:43:03 +0200655 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200656 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200657 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200658 self.assertTrue(cm.exception.__suppress_context__)
659
Victor Stinner6d101392013-04-14 16:35:04 +0200660
Tim Petersc4e09402003-04-25 07:11:48 +0000661class WalkTests(unittest.TestCase):
662 """Tests for os.walk()."""
663
Charles-François Natali7372b062012-02-05 15:15:38 +0100664 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000665 import os
666 from os.path import join
667
668 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000669 # TESTFN/
670 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000671 # tmp1
672 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000673 # tmp2
674 # SUB11/ no kids
675 # SUB2/ a file kid and a dirsymlink kid
676 # tmp3
677 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200678 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000679 # TEST2/
680 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000681 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000682 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000683 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000684 sub2_path = join(walk_path, "SUB2")
685 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000686 tmp2_path = join(sub1_path, "tmp2")
687 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000688 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000689 t2_path = join(support.TESTFN, "TEST2")
690 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200691 link_path = join(sub2_path, "link")
692 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000693
694 # Create stuff.
695 os.makedirs(sub11_path)
696 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000697 os.makedirs(t2_path)
698 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000699 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000700 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
701 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000702 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400703 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400704 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200705 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000706 else:
707 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000708
709 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000710 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000711 self.assertEqual(len(all), 4)
712 # We can't know which order SUB1 and SUB2 will appear in.
713 # Not flipped: TESTFN, SUB1, SUB11, SUB2
714 # flipped: TESTFN, SUB2, SUB1, SUB11
715 flipped = all[0][1][0] != "SUB1"
716 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200717 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000718 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000719 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
720 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000721 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000722
723 # Prune the search.
724 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000725 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000726 all.append((root, dirs, files))
727 # Don't descend into SUB1.
728 if 'SUB1' in dirs:
729 # Note that this also mutates the dirs we appended to all!
730 dirs.remove('SUB1')
731 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000732 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200733 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000734 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000735
736 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000737 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000738 self.assertEqual(len(all), 4)
739 # We can't know which order SUB1 and SUB2 will appear in.
740 # Not flipped: SUB11, SUB1, SUB2, TESTFN
741 # flipped: SUB2, SUB11, SUB1, TESTFN
742 flipped = all[3][1][0] != "SUB1"
743 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200744 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000745 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000746 self.assertEqual(all[flipped], (sub11_path, [], []))
747 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000748 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000749
Brian Curtin3b4499c2010-12-28 14:31:47 +0000750 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000751 # Walk, following symlinks.
752 for root, dirs, files in os.walk(walk_path, followlinks=True):
753 if root == link_path:
754 self.assertEqual(dirs, [])
755 self.assertEqual(files, ["tmp4"])
756 break
757 else:
758 self.fail("Didn't follow symlink with followlinks=True")
759
760 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000761 # Tear everything down. This is a decent use for bottom-up on
762 # Windows, which doesn't have a recursive delete command. The
763 # (not so) subtlety is that rmdir will fail unless the dir's
764 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000765 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000766 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000767 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000768 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000769 dirname = os.path.join(root, name)
770 if not os.path.islink(dirname):
771 os.rmdir(dirname)
772 else:
773 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000774 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000775
Charles-François Natali7372b062012-02-05 15:15:38 +0100776
777@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
778class FwalkTests(WalkTests):
779 """Tests for os.fwalk()."""
780
Larry Hastingsc48fe982012-06-25 04:49:05 -0700781 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
782 """
783 compare with walk() results.
784 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700785 walk_kwargs = walk_kwargs.copy()
786 fwalk_kwargs = fwalk_kwargs.copy()
787 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
788 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
789 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700790
Charles-François Natali7372b062012-02-05 15:15:38 +0100791 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700792 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100793 expected[root] = (set(dirs), set(files))
794
Larry Hastingsc48fe982012-06-25 04:49:05 -0700795 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100796 self.assertIn(root, expected)
797 self.assertEqual(expected[root], (set(dirs), set(files)))
798
Larry Hastingsc48fe982012-06-25 04:49:05 -0700799 def test_compare_to_walk(self):
800 kwargs = {'top': support.TESTFN}
801 self._compare_to_walk(kwargs, kwargs)
802
Charles-François Natali7372b062012-02-05 15:15:38 +0100803 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700804 try:
805 fd = os.open(".", os.O_RDONLY)
806 walk_kwargs = {'top': support.TESTFN}
807 fwalk_kwargs = walk_kwargs.copy()
808 fwalk_kwargs['dir_fd'] = fd
809 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
810 finally:
811 os.close(fd)
812
813 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100814 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700815 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
816 args = support.TESTFN, topdown, None
817 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100818 # check that the FD is valid
819 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700820 # redundant check
821 os.stat(rootfd)
822 # check that listdir() returns consistent information
823 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100824
825 def test_fd_leak(self):
826 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
827 # we both check that calling fwalk() a large number of times doesn't
828 # yield EMFILE, and that the minimum allocated FD hasn't changed.
829 minfd = os.dup(1)
830 os.close(minfd)
831 for i in range(256):
832 for x in os.fwalk(support.TESTFN):
833 pass
834 newfd = os.dup(1)
835 self.addCleanup(os.close, newfd)
836 self.assertEqual(newfd, minfd)
837
838 def tearDown(self):
839 # cleanup
840 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
841 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700842 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100843 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700844 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700845 if stat.S_ISDIR(st.st_mode):
846 os.rmdir(name, dir_fd=rootfd)
847 else:
848 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100849 os.rmdir(support.TESTFN)
850
851
Guido van Rossume7ba4952007-06-06 23:52:48 +0000852class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000853 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000854 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000855
856 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000857 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000858 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
859 os.makedirs(path) # Should work
860 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
861 os.makedirs(path)
862
863 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000864 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000865 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
866 os.makedirs(path)
867 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
868 'dir5', 'dir6')
869 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000870
Terry Reedy5a22b652010-12-02 07:05:56 +0000871 def test_exist_ok_existing_directory(self):
872 path = os.path.join(support.TESTFN, 'dir1')
873 mode = 0o777
874 old_mask = os.umask(0o022)
875 os.makedirs(path, mode)
876 self.assertRaises(OSError, os.makedirs, path, mode)
877 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
878 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
879 os.makedirs(path, mode=mode, exist_ok=True)
880 os.umask(old_mask)
881
Terry Jan Reedy4a0b6f72013-08-10 20:58:59 -0400882 @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
Larry Hastingsa27b83a2013-08-08 00:19:50 -0700883 def test_chown_uid_gid_arguments_must_be_index(self):
884 stat = os.stat(support.TESTFN)
885 uid = stat.st_uid
886 gid = stat.st_gid
887 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
888 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
889 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
890 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
891 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
892
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700893 def test_exist_ok_s_isgid_directory(self):
894 path = os.path.join(support.TESTFN, 'dir1')
895 S_ISGID = stat.S_ISGID
896 mode = 0o777
897 old_mask = os.umask(0o022)
898 try:
899 existing_testfn_mode = stat.S_IMODE(
900 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700901 try:
902 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700903 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700904 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700905 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
906 raise unittest.SkipTest('No support for S_ISGID dir mode.')
907 # The os should apply S_ISGID from the parent dir for us, but
908 # this test need not depend on that behavior. Be explicit.
909 os.makedirs(path, mode | S_ISGID)
910 # http://bugs.python.org/issue14992
911 # Should not fail when the bit is already set.
912 os.makedirs(path, mode, exist_ok=True)
913 # remove the bit.
914 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
915 with self.assertRaises(OSError):
916 # Should fail when the bit is not already set when demanded.
917 os.makedirs(path, mode | S_ISGID, exist_ok=True)
918 finally:
919 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000920
921 def test_exist_ok_existing_regular_file(self):
922 base = support.TESTFN
923 path = os.path.join(support.TESTFN, 'dir1')
924 f = open(path, 'w')
925 f.write('abc')
926 f.close()
927 self.assertRaises(OSError, os.makedirs, path)
928 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
929 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
930 os.remove(path)
931
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000932 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000933 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000934 'dir4', 'dir5', 'dir6')
935 # If the tests failed, the bottom-most directory ('../dir6')
936 # may not have been created, so we look for the outermost directory
937 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000938 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000939 path = os.path.dirname(path)
940
941 os.removedirs(path)
942
Andrew Svetlov405faed2012-12-25 12:18:09 +0200943
944class RemoveDirsTests(unittest.TestCase):
945 def setUp(self):
946 os.makedirs(support.TESTFN)
947
948 def tearDown(self):
949 support.rmtree(support.TESTFN)
950
951 def test_remove_all(self):
952 dira = os.path.join(support.TESTFN, 'dira')
953 os.mkdir(dira)
954 dirb = os.path.join(dira, 'dirb')
955 os.mkdir(dirb)
956 os.removedirs(dirb)
957 self.assertFalse(os.path.exists(dirb))
958 self.assertFalse(os.path.exists(dira))
959 self.assertFalse(os.path.exists(support.TESTFN))
960
961 def test_remove_partial(self):
962 dira = os.path.join(support.TESTFN, 'dira')
963 os.mkdir(dira)
964 dirb = os.path.join(dira, 'dirb')
965 os.mkdir(dirb)
966 with open(os.path.join(dira, 'file.txt'), 'w') as f:
967 f.write('text')
968 os.removedirs(dirb)
969 self.assertFalse(os.path.exists(dirb))
970 self.assertTrue(os.path.exists(dira))
971 self.assertTrue(os.path.exists(support.TESTFN))
972
973 def test_remove_nothing(self):
974 dira = os.path.join(support.TESTFN, 'dira')
975 os.mkdir(dira)
976 dirb = os.path.join(dira, 'dirb')
977 os.mkdir(dirb)
978 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
979 f.write('text')
980 with self.assertRaises(OSError):
981 os.removedirs(dirb)
982 self.assertTrue(os.path.exists(dirb))
983 self.assertTrue(os.path.exists(dira))
984 self.assertTrue(os.path.exists(support.TESTFN))
985
986
Guido van Rossume7ba4952007-06-06 23:52:48 +0000987class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000988 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200989 with open(os.devnull, 'wb') as f:
990 f.write(b'hello')
991 f.close()
992 with open(os.devnull, 'rb') as f:
993 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000994
Andrew Svetlov405faed2012-12-25 12:18:09 +0200995
Guido van Rossume7ba4952007-06-06 23:52:48 +0000996class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100997 def test_urandom_length(self):
998 self.assertEqual(len(os.urandom(0)), 0)
999 self.assertEqual(len(os.urandom(1)), 1)
1000 self.assertEqual(len(os.urandom(10)), 10)
1001 self.assertEqual(len(os.urandom(100)), 100)
1002 self.assertEqual(len(os.urandom(1000)), 1000)
1003
1004 def test_urandom_value(self):
1005 data1 = os.urandom(16)
1006 data2 = os.urandom(16)
1007 self.assertNotEqual(data1, data2)
1008
1009 def get_urandom_subprocess(self, count):
1010 code = '\n'.join((
1011 'import os, sys',
1012 'data = os.urandom(%s)' % count,
1013 'sys.stdout.buffer.write(data)',
1014 'sys.stdout.buffer.flush()'))
1015 out = assert_python_ok('-c', code)
1016 stdout = out[1]
1017 self.assertEqual(len(stdout), 16)
1018 return stdout
1019
1020 def test_urandom_subprocess(self):
1021 data1 = self.get_urandom_subprocess(16)
1022 data2 = self.get_urandom_subprocess(16)
1023 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001024
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001025 @unittest.skipUnless(resource, "test requires the resource module")
1026 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001027 # Check urandom() failing when it is not able to open /dev/random.
1028 # We spawn a new process to make the test more robust (if getrlimit()
1029 # failed to restore the file descriptor limit after this, the whole
1030 # test suite would crash; this actually happened on the OS X Tiger
1031 # buildbot).
1032 code = """if 1:
1033 import errno
1034 import os
1035 import resource
1036
1037 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1038 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1039 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001040 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001041 except OSError as e:
1042 assert e.errno == errno.EMFILE, e.errno
1043 else:
1044 raise AssertionError("OSError not raised")
1045 """
1046 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001047
1048
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001049@contextlib.contextmanager
1050def _execvpe_mockup(defpath=None):
1051 """
1052 Stubs out execv and execve functions when used as context manager.
1053 Records exec calls. The mock execv and execve functions always raise an
1054 exception as they would normally never return.
1055 """
1056 # A list of tuples containing (function name, first arg, args)
1057 # of calls to execv or execve that have been made.
1058 calls = []
1059
1060 def mock_execv(name, *args):
1061 calls.append(('execv', name, args))
1062 raise RuntimeError("execv called")
1063
1064 def mock_execve(name, *args):
1065 calls.append(('execve', name, args))
1066 raise OSError(errno.ENOTDIR, "execve called")
1067
1068 try:
1069 orig_execv = os.execv
1070 orig_execve = os.execve
1071 orig_defpath = os.defpath
1072 os.execv = mock_execv
1073 os.execve = mock_execve
1074 if defpath is not None:
1075 os.defpath = defpath
1076 yield calls
1077 finally:
1078 os.execv = orig_execv
1079 os.execve = orig_execve
1080 os.defpath = orig_defpath
1081
Guido van Rossume7ba4952007-06-06 23:52:48 +00001082class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001083 @unittest.skipIf(USING_LINUXTHREADS,
1084 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001085 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001086 self.assertRaises(OSError, os.execvpe, 'no such app-',
1087 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001088
Thomas Heller6790d602007-08-30 17:15:14 +00001089 def test_execvpe_with_bad_arglist(self):
1090 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1091
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001092 @unittest.skipUnless(hasattr(os, '_execvpe'),
1093 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001094 def _test_internal_execvpe(self, test_type):
1095 program_path = os.sep + 'absolutepath'
1096 if test_type is bytes:
1097 program = b'executable'
1098 fullpath = os.path.join(os.fsencode(program_path), program)
1099 native_fullpath = fullpath
1100 arguments = [b'progname', 'arg1', 'arg2']
1101 else:
1102 program = 'executable'
1103 arguments = ['progname', 'arg1', 'arg2']
1104 fullpath = os.path.join(program_path, program)
1105 if os.name != "nt":
1106 native_fullpath = os.fsencode(fullpath)
1107 else:
1108 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001109 env = {'spam': 'beans'}
1110
Victor Stinnerb745a742010-05-18 17:17:23 +00001111 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001112 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001113 self.assertRaises(RuntimeError,
1114 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001115 self.assertEqual(len(calls), 1)
1116 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1117
Victor Stinnerb745a742010-05-18 17:17:23 +00001118 # test os._execvpe() with a relative path:
1119 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001120 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001121 self.assertRaises(OSError,
1122 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001123 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001124 self.assertSequenceEqual(calls[0],
1125 ('execve', native_fullpath, (arguments, env)))
1126
1127 # test os._execvpe() with a relative path:
1128 # os.get_exec_path() reads the 'PATH' variable
1129 with _execvpe_mockup() as calls:
1130 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001131 if test_type is bytes:
1132 env_path[b'PATH'] = program_path
1133 else:
1134 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001135 self.assertRaises(OSError,
1136 os._execvpe, program, arguments, env=env_path)
1137 self.assertEqual(len(calls), 1)
1138 self.assertSequenceEqual(calls[0],
1139 ('execve', native_fullpath, (arguments, env_path)))
1140
1141 def test_internal_execvpe_str(self):
1142 self._test_internal_execvpe(str)
1143 if os.name != "nt":
1144 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001145
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001146
Thomas Wouters477c8d52006-05-27 19:21:47 +00001147class Win32ErrorTests(unittest.TestCase):
1148 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001149 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001150
1151 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001152 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001153
1154 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001155 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001156
1157 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001158 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001159 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001160 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001161 finally:
1162 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001163 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001164
1165 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001166 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001167
Thomas Wouters477c8d52006-05-27 19:21:47 +00001168 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001169 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001170
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001171class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001172 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001173 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1174 #singles.append("close")
1175 #We omit close because it doesn'r raise an exception on some platforms
1176 def get_single(f):
1177 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001178 if hasattr(os, f):
1179 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001180 return helper
1181 for f in singles:
1182 locals()["test_"+f] = get_single(f)
1183
Benjamin Peterson7522c742009-01-19 21:00:09 +00001184 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001185 try:
1186 f(support.make_bad_fd(), *args)
1187 except OSError as e:
1188 self.assertEqual(e.errno, errno.EBADF)
1189 else:
1190 self.fail("%r didn't raise a OSError with a bad file descriptor"
1191 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001192
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001193 def test_isatty(self):
1194 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001195 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001196
1197 def test_closerange(self):
1198 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001199 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +00001200 # Make sure none of the descriptors we are about to close are
1201 # currently valid (issue 6542).
1202 for i in range(10):
1203 try: os.fstat(fd+i)
1204 except OSError:
1205 pass
1206 else:
1207 break
1208 if i < 2:
1209 raise unittest.SkipTest(
1210 "Unable to acquire a range of invalid file descriptors")
1211 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001212
1213 def test_dup2(self):
1214 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001215 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001216
1217 def test_fchmod(self):
1218 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001219 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001220
1221 def test_fchown(self):
1222 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001223 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001224
1225 def test_fpathconf(self):
1226 if hasattr(os, "fpathconf"):
Georg Brandl306336b2012-06-24 12:55:33 +02001227 self.check(os.pathconf, "PC_NAME_MAX")
Benjamin Peterson7522c742009-01-19 21:00:09 +00001228 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001229
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001230 def test_ftruncate(self):
1231 if hasattr(os, "ftruncate"):
Georg Brandl306336b2012-06-24 12:55:33 +02001232 self.check(os.truncate, 0)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001233 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001234
1235 def test_lseek(self):
1236 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001237 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001238
1239 def test_read(self):
1240 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001241 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001242
1243 def test_tcsetpgrpt(self):
1244 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001245 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001246
1247 def test_write(self):
1248 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001249 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001250
Brian Curtin1b9df392010-11-24 20:24:31 +00001251
1252class LinkTests(unittest.TestCase):
1253 def setUp(self):
1254 self.file1 = support.TESTFN
1255 self.file2 = os.path.join(support.TESTFN + "2")
1256
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001257 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001258 for file in (self.file1, self.file2):
1259 if os.path.exists(file):
1260 os.unlink(file)
1261
Brian Curtin1b9df392010-11-24 20:24:31 +00001262 def _test_link(self, file1, file2):
1263 with open(file1, "w") as f1:
1264 f1.write("test")
1265
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001266 with warnings.catch_warnings():
1267 warnings.simplefilter("ignore", DeprecationWarning)
1268 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001269 with open(file1, "r") as f1, open(file2, "r") as f2:
1270 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1271
1272 def test_link(self):
1273 self._test_link(self.file1, self.file2)
1274
1275 def test_link_bytes(self):
1276 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1277 bytes(self.file2, sys.getfilesystemencoding()))
1278
Brian Curtinf498b752010-11-30 15:54:04 +00001279 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001280 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001281 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001282 except UnicodeError:
1283 raise unittest.SkipTest("Unable to encode for this platform.")
1284
Brian Curtinf498b752010-11-30 15:54:04 +00001285 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001286 self.file2 = self.file1 + "2"
1287 self._test_link(self.file1, self.file2)
1288
Thomas Wouters477c8d52006-05-27 19:21:47 +00001289if sys.platform != 'win32':
1290 class Win32ErrorTests(unittest.TestCase):
1291 pass
1292
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001293 class PosixUidGidTests(unittest.TestCase):
1294 if hasattr(os, 'setuid'):
1295 def test_setuid(self):
1296 if os.getuid() != 0:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001297 self.assertRaises(OSError, os.setuid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001298 self.assertRaises(OverflowError, os.setuid, 1<<32)
1299
1300 if hasattr(os, 'setgid'):
1301 def test_setgid(self):
Stefan Krahebee49a2013-01-17 15:31:00 +01001302 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001303 self.assertRaises(OSError, os.setgid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001304 self.assertRaises(OverflowError, os.setgid, 1<<32)
1305
1306 if hasattr(os, 'seteuid'):
1307 def test_seteuid(self):
1308 if os.getuid() != 0:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001309 self.assertRaises(OSError, os.seteuid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001310 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1311
1312 if hasattr(os, 'setegid'):
1313 def test_setegid(self):
Stefan Krahebee49a2013-01-17 15:31:00 +01001314 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001315 self.assertRaises(OSError, os.setegid, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001316 self.assertRaises(OverflowError, os.setegid, 1<<32)
1317
1318 if hasattr(os, 'setreuid'):
1319 def test_setreuid(self):
1320 if os.getuid() != 0:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001321 self.assertRaises(OSError, os.setreuid, 0, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001322 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1323 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001324
1325 def test_setreuid_neg1(self):
1326 # Needs to accept -1. We run this in a subprocess to avoid
1327 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001328 subprocess.check_call([
1329 sys.executable, '-c',
1330 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001331
1332 if hasattr(os, 'setregid'):
1333 def test_setregid(self):
Stefan Krahebee49a2013-01-17 15:31:00 +01001334 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
Andrew Svetlov8b33dd82012-12-24 19:58:48 +02001335 self.assertRaises(OSError, os.setregid, 0, 0)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001336 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1337 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001338
1339 def test_setregid_neg1(self):
1340 # Needs to accept -1. We run this in a subprocess to avoid
1341 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001342 subprocess.check_call([
1343 sys.executable, '-c',
1344 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001345
1346 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001347 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001348 if support.TESTFN_UNENCODABLE:
1349 self.dir = support.TESTFN_UNENCODABLE
Victor Stinner8b219b22012-11-06 23:23:43 +01001350 elif support.TESTFN_NONASCII:
1351 self.dir = support.TESTFN_NONASCII
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001352 else:
1353 self.dir = support.TESTFN
1354 self.bdir = os.fsencode(self.dir)
1355
1356 bytesfn = []
1357 def add_filename(fn):
1358 try:
1359 fn = os.fsencode(fn)
1360 except UnicodeEncodeError:
1361 return
1362 bytesfn.append(fn)
1363 add_filename(support.TESTFN_UNICODE)
1364 if support.TESTFN_UNENCODABLE:
1365 add_filename(support.TESTFN_UNENCODABLE)
Victor Stinner8b219b22012-11-06 23:23:43 +01001366 if support.TESTFN_NONASCII:
1367 add_filename(support.TESTFN_NONASCII)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001368 if not bytesfn:
1369 self.skipTest("couldn't create any non-ascii filename")
1370
1371 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001372 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001373 try:
1374 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001375 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001376 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001377 if fn in self.unicodefn:
1378 raise ValueError("duplicate filename")
1379 self.unicodefn.add(fn)
1380 except:
1381 shutil.rmtree(self.dir)
1382 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001383
1384 def tearDown(self):
1385 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001386
1387 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001388 expected = self.unicodefn
1389 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001390 self.assertEqual(found, expected)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001391 # test listdir without arguments
1392 current_directory = os.getcwd()
1393 try:
1394 os.chdir(os.sep)
1395 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1396 finally:
1397 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001398
1399 def test_open(self):
1400 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001401 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001402 f.close()
1403
Victor Stinnere4110dc2013-01-01 23:05:55 +01001404 @unittest.skipUnless(hasattr(os, 'statvfs'),
1405 "need os.statvfs()")
1406 def test_statvfs(self):
1407 # issue #9645
1408 for fn in self.unicodefn:
1409 # should not fail with file not found error
1410 fullname = os.path.join(self.dir, fn)
1411 os.statvfs(fullname)
1412
Martin v. Löwis011e8422009-05-05 04:43:17 +00001413 def test_stat(self):
1414 for fn in self.unicodefn:
1415 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001416else:
1417 class PosixUidGidTests(unittest.TestCase):
1418 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001419 class Pep383Tests(unittest.TestCase):
1420 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001421
Brian Curtineb24d742010-04-12 17:16:38 +00001422@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1423class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001424 def _kill(self, sig):
1425 # Start sys.executable as a subprocess and communicate from the
1426 # subprocess to the parent that the interpreter is ready. When it
1427 # becomes ready, send *sig* via os.kill to the subprocess and check
1428 # that the return code is equal to *sig*.
1429 import ctypes
1430 from ctypes import wintypes
1431 import msvcrt
1432
1433 # Since we can't access the contents of the process' stdout until the
1434 # process has exited, use PeekNamedPipe to see what's inside stdout
1435 # without waiting. This is done so we can tell that the interpreter
1436 # is started and running at a point where it could handle a signal.
1437 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1438 PeekNamedPipe.restype = wintypes.BOOL
1439 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1440 ctypes.POINTER(ctypes.c_char), # stdout buf
1441 wintypes.DWORD, # Buffer size
1442 ctypes.POINTER(wintypes.DWORD), # bytes read
1443 ctypes.POINTER(wintypes.DWORD), # bytes avail
1444 ctypes.POINTER(wintypes.DWORD)) # bytes left
1445 msg = "running"
1446 proc = subprocess.Popen([sys.executable, "-c",
1447 "import sys;"
1448 "sys.stdout.write('{}');"
1449 "sys.stdout.flush();"
1450 "input()".format(msg)],
1451 stdout=subprocess.PIPE,
1452 stderr=subprocess.PIPE,
1453 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001454 self.addCleanup(proc.stdout.close)
1455 self.addCleanup(proc.stderr.close)
1456 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001457
1458 count, max = 0, 100
1459 while count < max and proc.poll() is None:
1460 # Create a string buffer to store the result of stdout from the pipe
1461 buf = ctypes.create_string_buffer(len(msg))
1462 # Obtain the text currently in proc.stdout
1463 # Bytes read/avail/left are left as NULL and unused
1464 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1465 buf, ctypes.sizeof(buf), None, None, None)
1466 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1467 if buf.value:
1468 self.assertEqual(msg, buf.value.decode())
1469 break
1470 time.sleep(0.1)
1471 count += 1
1472 else:
1473 self.fail("Did not receive communication from the subprocess")
1474
Brian Curtineb24d742010-04-12 17:16:38 +00001475 os.kill(proc.pid, sig)
1476 self.assertEqual(proc.wait(), sig)
1477
1478 def test_kill_sigterm(self):
1479 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001480 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001481
1482 def test_kill_int(self):
1483 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001484 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001485
1486 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001487 tagname = "test_os_%s" % uuid.uuid1()
1488 m = mmap.mmap(-1, 1, tagname)
1489 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001490 # Run a script which has console control handling enabled.
1491 proc = subprocess.Popen([sys.executable,
1492 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001493 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001494 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1495 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001496 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001497 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001498 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001499 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001500 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001501 count += 1
1502 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001503 # Forcefully kill the process if we weren't able to signal it.
1504 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001505 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001506 os.kill(proc.pid, event)
1507 # proc.send_signal(event) could also be done here.
1508 # Allow time for the signal to be passed and the process to exit.
1509 time.sleep(0.5)
1510 if not proc.poll():
1511 # Forcefully kill the process if we weren't able to signal it.
1512 os.kill(proc.pid, signal.SIGINT)
1513 self.fail("subprocess did not stop on {}".format(name))
1514
1515 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1516 def test_CTRL_C_EVENT(self):
1517 from ctypes import wintypes
1518 import ctypes
1519
1520 # Make a NULL value by creating a pointer with no argument.
1521 NULL = ctypes.POINTER(ctypes.c_int)()
1522 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1523 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1524 wintypes.BOOL)
1525 SetConsoleCtrlHandler.restype = wintypes.BOOL
1526
1527 # Calling this with NULL and FALSE causes the calling process to
1528 # handle CTRL+C, rather than ignore it. This property is inherited
1529 # by subprocesses.
1530 SetConsoleCtrlHandler(NULL, 0)
1531
1532 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1533
1534 def test_CTRL_BREAK_EVENT(self):
1535 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1536
1537
Brian Curtind40e6f72010-07-08 21:39:08 +00001538@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001539@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001540class Win32SymlinkTests(unittest.TestCase):
1541 filelink = 'filelinktest'
1542 filelink_target = os.path.abspath(__file__)
1543 dirlink = 'dirlinktest'
1544 dirlink_target = os.path.dirname(filelink_target)
1545 missing_link = 'missing link'
1546
1547 def setUp(self):
1548 assert os.path.exists(self.dirlink_target)
1549 assert os.path.exists(self.filelink_target)
1550 assert not os.path.exists(self.dirlink)
1551 assert not os.path.exists(self.filelink)
1552 assert not os.path.exists(self.missing_link)
1553
1554 def tearDown(self):
1555 if os.path.exists(self.filelink):
1556 os.remove(self.filelink)
1557 if os.path.exists(self.dirlink):
1558 os.rmdir(self.dirlink)
1559 if os.path.lexists(self.missing_link):
1560 os.remove(self.missing_link)
1561
1562 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001563 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001564 self.assertTrue(os.path.exists(self.dirlink))
1565 self.assertTrue(os.path.isdir(self.dirlink))
1566 self.assertTrue(os.path.islink(self.dirlink))
1567 self.check_stat(self.dirlink, self.dirlink_target)
1568
1569 def test_file_link(self):
1570 os.symlink(self.filelink_target, self.filelink)
1571 self.assertTrue(os.path.exists(self.filelink))
1572 self.assertTrue(os.path.isfile(self.filelink))
1573 self.assertTrue(os.path.islink(self.filelink))
1574 self.check_stat(self.filelink, self.filelink_target)
1575
1576 def _create_missing_dir_link(self):
1577 'Create a "directory" link to a non-existent target'
1578 linkname = self.missing_link
1579 if os.path.lexists(linkname):
1580 os.remove(linkname)
1581 target = r'c:\\target does not exist.29r3c740'
1582 assert not os.path.exists(target)
1583 target_is_dir = True
1584 os.symlink(target, linkname, target_is_dir)
1585
1586 def test_remove_directory_link_to_missing_target(self):
1587 self._create_missing_dir_link()
1588 # For compatibility with Unix, os.remove will check the
1589 # directory status and call RemoveDirectory if the symlink
1590 # was created with target_is_dir==True.
1591 os.remove(self.missing_link)
1592
1593 @unittest.skip("currently fails; consider for improvement")
1594 def test_isdir_on_directory_link_to_missing_target(self):
1595 self._create_missing_dir_link()
1596 # consider having isdir return true for directory links
1597 self.assertTrue(os.path.isdir(self.missing_link))
1598
1599 @unittest.skip("currently fails; consider for improvement")
1600 def test_rmdir_on_directory_link_to_missing_target(self):
1601 self._create_missing_dir_link()
1602 # consider allowing rmdir to remove directory links
1603 os.rmdir(self.missing_link)
1604
1605 def check_stat(self, link, target):
1606 self.assertEqual(os.stat(link), os.stat(target))
1607 self.assertNotEqual(os.lstat(link), os.stat(link))
1608
Brian Curtind25aef52011-06-13 15:16:04 -05001609 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001610 with warnings.catch_warnings():
1611 warnings.simplefilter("ignore", DeprecationWarning)
1612 self.assertEqual(os.stat(bytes_link), os.stat(target))
1613 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001614
1615 def test_12084(self):
1616 level1 = os.path.abspath(support.TESTFN)
1617 level2 = os.path.join(level1, "level2")
1618 level3 = os.path.join(level2, "level3")
1619 try:
1620 os.mkdir(level1)
1621 os.mkdir(level2)
1622 os.mkdir(level3)
1623
1624 file1 = os.path.abspath(os.path.join(level1, "file1"))
1625
1626 with open(file1, "w") as f:
1627 f.write("file1")
1628
1629 orig_dir = os.getcwd()
1630 try:
1631 os.chdir(level2)
1632 link = os.path.join(level2, "link")
1633 os.symlink(os.path.relpath(file1), "link")
1634 self.assertIn("link", os.listdir(os.getcwd()))
1635
1636 # Check os.stat calls from the same dir as the link
1637 self.assertEqual(os.stat(file1), os.stat("link"))
1638
1639 # Check os.stat calls from a dir below the link
1640 os.chdir(level1)
1641 self.assertEqual(os.stat(file1),
1642 os.stat(os.path.relpath(link)))
1643
1644 # Check os.stat calls from a dir above the link
1645 os.chdir(level3)
1646 self.assertEqual(os.stat(file1),
1647 os.stat(os.path.relpath(link)))
1648 finally:
1649 os.chdir(orig_dir)
1650 except OSError as err:
1651 self.fail(err)
1652 finally:
1653 os.remove(file1)
1654 shutil.rmtree(level1)
1655
Brian Curtind40e6f72010-07-08 21:39:08 +00001656
Jason R. Coombs3a092862013-05-27 23:21:28 -04001657@support.skip_unless_symlink
1658class NonLocalSymlinkTests(unittest.TestCase):
1659
1660 def setUp(self):
1661 """
1662 Create this structure:
1663
1664 base
1665 \___ some_dir
1666 """
1667 os.makedirs('base/some_dir')
1668
1669 def tearDown(self):
1670 shutil.rmtree('base')
1671
1672 def test_directory_link_nonlocal(self):
1673 """
1674 The symlink target should resolve relative to the link, not relative
1675 to the current directory.
1676
1677 Then, link base/some_link -> base/some_dir and ensure that some_link
1678 is resolved as a directory.
1679
1680 In issue13772, it was discovered that directory detection failed if
1681 the symlink target was not specified relative to the current
1682 directory, which was a defect in the implementation.
1683 """
1684 src = os.path.join('base', 'some_link')
1685 os.symlink('some_dir', src)
1686 assert os.path.isdir(src)
1687
1688
Victor Stinnere8d51452010-08-19 01:05:19 +00001689class FSEncodingTests(unittest.TestCase):
1690 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001691 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1692 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001693
Victor Stinnere8d51452010-08-19 01:05:19 +00001694 def test_identity(self):
1695 # assert fsdecode(fsencode(x)) == x
1696 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1697 try:
1698 bytesfn = os.fsencode(fn)
1699 except UnicodeEncodeError:
1700 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001701 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001702
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001703
Brett Cannonefb00c02012-02-29 18:31:31 -05001704
1705class DeviceEncodingTests(unittest.TestCase):
1706
1707 def test_bad_fd(self):
1708 # Return None when an fd doesn't actually exist.
1709 self.assertIsNone(os.device_encoding(123456))
1710
Philip Jenveye308b7c2012-02-29 16:16:15 -08001711 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1712 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001713 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001714 def test_device_encoding(self):
1715 encoding = os.device_encoding(0)
1716 self.assertIsNotNone(encoding)
1717 self.assertTrue(codecs.lookup(encoding))
1718
1719
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001720class PidTests(unittest.TestCase):
1721 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1722 def test_getppid(self):
1723 p = subprocess.Popen([sys.executable, '-c',
1724 'import os; print(os.getppid())'],
1725 stdout=subprocess.PIPE)
1726 stdout, _ = p.communicate()
1727 # We are the parent of our subprocess
1728 self.assertEqual(int(stdout), os.getpid())
1729
1730
Brian Curtin0151b8e2010-09-24 13:43:43 +00001731# The introduction of this TestCase caused at least two different errors on
1732# *nix buildbots. Temporarily skip this to let the buildbots move along.
1733@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001734@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1735class LoginTests(unittest.TestCase):
1736 def test_getlogin(self):
1737 user_name = os.getlogin()
1738 self.assertNotEqual(len(user_name), 0)
1739
1740
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001741@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1742 "needs os.getpriority and os.setpriority")
1743class ProgramPriorityTests(unittest.TestCase):
1744 """Tests for os.getpriority() and os.setpriority()."""
1745
1746 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001747
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001748 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1749 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1750 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001751 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1752 if base >= 19 and new_prio <= 19:
1753 raise unittest.SkipTest(
1754 "unable to reliably test setpriority at current nice level of %s" % base)
1755 else:
1756 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001757 finally:
1758 try:
1759 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1760 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001761 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001762 raise
1763
1764
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001765if threading is not None:
1766 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001767
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001768 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001769
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001770 def __init__(self, conn):
1771 asynchat.async_chat.__init__(self, conn)
1772 self.in_buffer = []
1773 self.closed = False
1774 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001775
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001776 def handle_read(self):
1777 data = self.recv(4096)
1778 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001779
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001780 def get_data(self):
1781 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001782
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001783 def handle_close(self):
1784 self.close()
1785 self.closed = True
1786
1787 def handle_error(self):
1788 raise
1789
1790 def __init__(self, address):
1791 threading.Thread.__init__(self)
1792 asyncore.dispatcher.__init__(self)
1793 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1794 self.bind(address)
1795 self.listen(5)
1796 self.host, self.port = self.socket.getsockname()[:2]
1797 self.handler_instance = None
1798 self._active = False
1799 self._active_lock = threading.Lock()
1800
1801 # --- public API
1802
1803 @property
1804 def running(self):
1805 return self._active
1806
1807 def start(self):
1808 assert not self.running
1809 self.__flag = threading.Event()
1810 threading.Thread.start(self)
1811 self.__flag.wait()
1812
1813 def stop(self):
1814 assert self.running
1815 self._active = False
1816 self.join()
1817
1818 def wait(self):
1819 # wait for handler connection to be closed, then stop the server
1820 while not getattr(self.handler_instance, "closed", False):
1821 time.sleep(0.001)
1822 self.stop()
1823
1824 # --- internals
1825
1826 def run(self):
1827 self._active = True
1828 self.__flag.set()
1829 while self._active and asyncore.socket_map:
1830 self._active_lock.acquire()
1831 asyncore.loop(timeout=0.001, count=1)
1832 self._active_lock.release()
1833 asyncore.close_all()
1834
1835 def handle_accept(self):
1836 conn, addr = self.accept()
1837 self.handler_instance = self.Handler(conn)
1838
1839 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001840 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001841 handle_read = handle_connect
1842
1843 def writable(self):
1844 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001845
1846 def handle_error(self):
1847 raise
1848
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001849
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001850@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001851@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1852class TestSendfile(unittest.TestCase):
1853
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001854 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001855 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001856 not sys.platform.startswith("solaris") and \
1857 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001858
1859 @classmethod
1860 def setUpClass(cls):
1861 with open(support.TESTFN, "wb") as f:
1862 f.write(cls.DATA)
1863
1864 @classmethod
1865 def tearDownClass(cls):
1866 support.unlink(support.TESTFN)
1867
1868 def setUp(self):
1869 self.server = SendfileTestServer((support.HOST, 0))
1870 self.server.start()
1871 self.client = socket.socket()
1872 self.client.connect((self.server.host, self.server.port))
1873 self.client.settimeout(1)
1874 # synchronize by waiting for "220 ready" response
1875 self.client.recv(1024)
1876 self.sockno = self.client.fileno()
1877 self.file = open(support.TESTFN, 'rb')
1878 self.fileno = self.file.fileno()
1879
1880 def tearDown(self):
1881 self.file.close()
1882 self.client.close()
1883 if self.server.running:
1884 self.server.stop()
1885
1886 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1887 """A higher level wrapper representing how an application is
1888 supposed to use sendfile().
1889 """
1890 while 1:
1891 try:
1892 if self.SUPPORT_HEADERS_TRAILERS:
1893 return os.sendfile(sock, file, offset, nbytes, headers,
1894 trailers)
1895 else:
1896 return os.sendfile(sock, file, offset, nbytes)
1897 except OSError as err:
1898 if err.errno == errno.ECONNRESET:
1899 # disconnected
1900 raise
1901 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1902 # we have to retry send data
1903 continue
1904 else:
1905 raise
1906
1907 def test_send_whole_file(self):
1908 # normal send
1909 total_sent = 0
1910 offset = 0
1911 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001912 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001913 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1914 if sent == 0:
1915 break
1916 offset += sent
1917 total_sent += sent
1918 self.assertTrue(sent <= nbytes)
1919 self.assertEqual(offset, total_sent)
1920
1921 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001922 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001923 self.client.close()
1924 self.server.wait()
1925 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001926 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001927 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001928
1929 def test_send_at_certain_offset(self):
1930 # start sending a file at a certain offset
1931 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001932 offset = len(self.DATA) // 2
1933 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001934 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001935 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001936 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1937 if sent == 0:
1938 break
1939 offset += sent
1940 total_sent += sent
1941 self.assertTrue(sent <= nbytes)
1942
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001943 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001944 self.client.close()
1945 self.server.wait()
1946 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001947 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001948 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001949 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001950 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001951
1952 def test_offset_overflow(self):
1953 # specify an offset > file size
1954 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001955 try:
1956 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1957 except OSError as e:
1958 # Solaris can raise EINVAL if offset >= file length, ignore.
1959 if e.errno != errno.EINVAL:
1960 raise
1961 else:
1962 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001963 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001964 self.client.close()
1965 self.server.wait()
1966 data = self.server.handler_instance.get_data()
1967 self.assertEqual(data, b'')
1968
1969 def test_invalid_offset(self):
1970 with self.assertRaises(OSError) as cm:
1971 os.sendfile(self.sockno, self.fileno, -1, 4096)
1972 self.assertEqual(cm.exception.errno, errno.EINVAL)
1973
1974 # --- headers / trailers tests
1975
1976 if SUPPORT_HEADERS_TRAILERS:
1977
1978 def test_headers(self):
1979 total_sent = 0
1980 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1981 headers=[b"x" * 512])
1982 total_sent += sent
1983 offset = 4096
1984 nbytes = 4096
1985 while 1:
1986 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1987 offset, nbytes)
1988 if sent == 0:
1989 break
1990 total_sent += sent
1991 offset += sent
1992
1993 expected_data = b"x" * 512 + self.DATA
1994 self.assertEqual(total_sent, len(expected_data))
1995 self.client.close()
1996 self.server.wait()
1997 data = self.server.handler_instance.get_data()
1998 self.assertEqual(hash(data), hash(expected_data))
1999
2000 def test_trailers(self):
2001 TESTFN2 = support.TESTFN + "2"
Victor Stinner5e4d6392013-08-15 11:57:02 +02002002 file_data = b"abcdef"
Brett Cannonb6376802011-03-15 17:38:22 -04002003 with open(TESTFN2, 'wb') as f:
Victor Stinner5e4d6392013-08-15 11:57:02 +02002004 f.write(file_data)
Brett Cannonb6376802011-03-15 17:38:22 -04002005 with open(TESTFN2, 'rb')as f:
2006 self.addCleanup(os.remove, TESTFN2)
Victor Stinner5e4d6392013-08-15 11:57:02 +02002007 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2008 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002009 self.client.close()
2010 self.server.wait()
2011 data = self.server.handler_instance.get_data()
Victor Stinner5e4d6392013-08-15 11:57:02 +02002012 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002013
2014 if hasattr(os, "SF_NODISKIO"):
2015 def test_flags(self):
2016 try:
2017 os.sendfile(self.sockno, self.fileno, 0, 4096,
2018 flags=os.SF_NODISKIO)
2019 except OSError as err:
2020 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2021 raise
2022
2023
Larry Hastings9cf065c2012-06-22 16:30:09 -07002024def supports_extended_attributes():
2025 if not hasattr(os, "setxattr"):
2026 return False
2027 try:
2028 with open(support.TESTFN, "wb") as fp:
2029 try:
2030 os.setxattr(fp.fileno(), b"user.test", b"")
2031 except OSError:
2032 return False
2033 finally:
2034 support.unlink(support.TESTFN)
2035 # Kernels < 2.6.39 don't respect setxattr flags.
2036 kernel_version = platform.release()
2037 m = re.match("2.6.(\d{1,2})", kernel_version)
2038 return m is None or int(m.group(1)) >= 39
2039
2040
2041@unittest.skipUnless(supports_extended_attributes(),
2042 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002043class ExtendedAttributeTests(unittest.TestCase):
2044
2045 def tearDown(self):
2046 support.unlink(support.TESTFN)
2047
Larry Hastings9cf065c2012-06-22 16:30:09 -07002048 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002049 fn = support.TESTFN
2050 open(fn, "wb").close()
2051 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002052 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002053 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002054 init_xattr = listxattr(fn)
2055 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002056 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002057 xattr = set(init_xattr)
2058 xattr.add("user.test")
2059 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002060 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2061 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2062 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002063 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002064 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002065 self.assertEqual(cm.exception.errno, errno.EEXIST)
2066 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002067 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002068 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002069 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002070 xattr.add("user.test2")
2071 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002072 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002073 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002074 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002075 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002076 xattr.remove("user.test")
2077 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002078 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2079 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2080 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2081 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002082 many = sorted("user.test{}".format(i) for i in range(100))
2083 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002084 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002085 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002086
Larry Hastings9cf065c2012-06-22 16:30:09 -07002087 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002088 def make_bytes(s):
2089 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002090 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002091 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002092 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002093
2094 def test_simple(self):
2095 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2096 os.listxattr)
2097
2098 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002099 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2100 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002101
2102 def test_fds(self):
2103 def getxattr(path, *args):
2104 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002105 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002106 def setxattr(path, *args):
2107 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002108 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002109 def removexattr(path, *args):
2110 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002111 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002112 def listxattr(path, *args):
2113 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002114 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002115 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2116
2117
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002118@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2119class Win32DeprecatedBytesAPI(unittest.TestCase):
2120 def test_deprecated(self):
2121 import nt
2122 filename = os.fsencode(support.TESTFN)
2123 with warnings.catch_warnings():
2124 warnings.simplefilter("error", DeprecationWarning)
2125 for func, *args in (
2126 (nt._getfullpathname, filename),
2127 (nt._isdir, filename),
2128 (os.access, filename, os.R_OK),
2129 (os.chdir, filename),
2130 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002131 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002132 (os.link, filename, filename),
2133 (os.listdir, filename),
2134 (os.lstat, filename),
2135 (os.mkdir, filename),
2136 (os.open, filename, os.O_RDONLY),
2137 (os.rename, filename, filename),
2138 (os.rmdir, filename),
2139 (os.startfile, filename),
2140 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002141 (os.unlink, filename),
2142 (os.utime, filename),
2143 ):
2144 self.assertRaises(DeprecationWarning, func, *args)
2145
Victor Stinner28216442011-11-16 00:34:44 +01002146 @support.skip_unless_symlink
2147 def test_symlink(self):
2148 filename = os.fsencode(support.TESTFN)
2149 with warnings.catch_warnings():
2150 warnings.simplefilter("error", DeprecationWarning)
2151 self.assertRaises(DeprecationWarning,
2152 os.symlink, filename, filename)
2153
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002154
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002155@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2156class TermsizeTests(unittest.TestCase):
2157 def test_does_not_crash(self):
2158 """Check if get_terminal_size() returns a meaningful value.
2159
2160 There's no easy portable way to actually check the size of the
2161 terminal, so let's check if it returns something sensible instead.
2162 """
2163 try:
2164 size = os.get_terminal_size()
2165 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002166 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002167 # Under win32 a generic OSError can be thrown if the
2168 # handle cannot be retrieved
2169 self.skipTest("failed to query terminal size")
2170 raise
2171
Antoine Pitroucfade362012-02-08 23:48:59 +01002172 self.assertGreaterEqual(size.columns, 0)
2173 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002174
2175 def test_stty_match(self):
2176 """Check if stty returns the same results
2177
2178 stty actually tests stdin, so get_terminal_size is invoked on
2179 stdin explicitly. If stty succeeded, then get_terminal_size()
2180 should work too.
2181 """
2182 try:
2183 size = subprocess.check_output(['stty', 'size']).decode().split()
2184 except (FileNotFoundError, subprocess.CalledProcessError):
2185 self.skipTest("stty invocation failed")
2186 expected = (int(size[1]), int(size[0])) # reversed order
2187
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002188 try:
2189 actual = os.get_terminal_size(sys.__stdin__.fileno())
2190 except OSError as e:
2191 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2192 # Under win32 a generic OSError can be thrown if the
2193 # handle cannot be retrieved
2194 self.skipTest("failed to query terminal size")
2195 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002196 self.assertEqual(expected, actual)
2197
2198
Victor Stinner292c8352012-10-30 02:17:38 +01002199class OSErrorTests(unittest.TestCase):
2200 def setUp(self):
2201 class Str(str):
2202 pass
2203
Victor Stinnerafe17062012-10-31 22:47:43 +01002204 self.bytes_filenames = []
2205 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002206 if support.TESTFN_UNENCODABLE is not None:
2207 decoded = support.TESTFN_UNENCODABLE
2208 else:
2209 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002210 self.unicode_filenames.append(decoded)
2211 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002212 if support.TESTFN_UNDECODABLE is not None:
2213 encoded = support.TESTFN_UNDECODABLE
2214 else:
2215 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002216 self.bytes_filenames.append(encoded)
2217 self.bytes_filenames.append(memoryview(encoded))
2218
2219 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002220
2221 def test_oserror_filename(self):
2222 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002223 (self.filenames, os.chdir,),
2224 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002225 (self.filenames, os.lstat,),
2226 (self.filenames, os.open, os.O_RDONLY),
2227 (self.filenames, os.rmdir,),
2228 (self.filenames, os.stat,),
2229 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002230 ]
2231 if sys.platform == "win32":
2232 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002233 (self.bytes_filenames, os.rename, b"dst"),
2234 (self.bytes_filenames, os.replace, b"dst"),
2235 (self.unicode_filenames, os.rename, "dst"),
2236 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002237 # Issue #16414: Don't test undecodable names with listdir()
2238 # because of a Windows bug.
2239 #
2240 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2241 # empty list (instead of failing), whereas os.listdir(b'\xff')
2242 # raises a FileNotFoundError. It looks like a Windows bug:
2243 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2244 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2245 # ERROR_PATH_NOT_FOUND (3).
2246 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002247 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002248 else:
2249 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002250 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002251 (self.filenames, os.rename, "dst"),
2252 (self.filenames, os.replace, "dst"),
2253 ))
2254 if hasattr(os, "chown"):
2255 funcs.append((self.filenames, os.chown, 0, 0))
2256 if hasattr(os, "lchown"):
2257 funcs.append((self.filenames, os.lchown, 0, 0))
2258 if hasattr(os, "truncate"):
2259 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002260 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002261 funcs.append((self.filenames, os.chflags, 0))
2262 if hasattr(os, "lchflags"):
2263 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002264 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002265 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002266 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002267 if sys.platform == "win32":
2268 funcs.append((self.bytes_filenames, os.link, b"dst"))
2269 funcs.append((self.unicode_filenames, os.link, "dst"))
2270 else:
2271 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002272 if hasattr(os, "listxattr"):
2273 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002274 (self.filenames, os.listxattr,),
2275 (self.filenames, os.getxattr, "user.test"),
2276 (self.filenames, os.setxattr, "user.test", b'user'),
2277 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002278 ))
2279 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002280 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002281 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002282 if sys.platform == "win32":
2283 funcs.append((self.unicode_filenames, os.readlink,))
2284 else:
2285 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002286
Victor Stinnerafe17062012-10-31 22:47:43 +01002287 for filenames, func, *func_args in funcs:
2288 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002289 try:
2290 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002291 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002292 self.assertIs(err.filename, name)
2293 else:
2294 self.fail("No exception thrown by {}".format(func))
2295
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002296class CPUCountTests(unittest.TestCase):
2297 def test_cpu_count(self):
2298 cpus = os.cpu_count()
2299 if cpus is not None:
2300 self.assertIsInstance(cpus, int)
2301 self.assertGreater(cpus, 0)
2302 else:
2303 self.skipTest("Could not determine the number of CPUs")
2304
Victor Stinnerdaf45552013-08-28 00:53:59 +02002305
2306class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002307 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002308 fd = os.open(__file__, os.O_RDONLY)
2309 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002310 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002311
Victor Stinnerdaf45552013-08-28 00:53:59 +02002312 os.set_inheritable(fd, True)
2313 self.assertEqual(os.get_inheritable(fd), True)
2314
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002315 @unittest.skipIf(fcntl is None, "need fcntl")
2316 def test_get_inheritable_cloexec(self):
2317 fd = os.open(__file__, os.O_RDONLY)
2318 self.addCleanup(os.close, fd)
2319 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002320
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002321 # clear FD_CLOEXEC flag
2322 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2323 flags &= ~fcntl.FD_CLOEXEC
2324 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002325
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002326 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002327
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002328 @unittest.skipIf(fcntl is None, "need fcntl")
2329 def test_set_inheritable_cloexec(self):
2330 fd = os.open(__file__, os.O_RDONLY)
2331 self.addCleanup(os.close, fd)
2332 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2333 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002334
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002335 os.set_inheritable(fd, True)
2336 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2337 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002338
Victor Stinnerdaf45552013-08-28 00:53:59 +02002339 def test_open(self):
2340 fd = os.open(__file__, os.O_RDONLY)
2341 self.addCleanup(os.close, fd)
2342 self.assertEqual(os.get_inheritable(fd), False)
2343
2344 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2345 def test_pipe(self):
2346 rfd, wfd = os.pipe()
2347 self.addCleanup(os.close, rfd)
2348 self.addCleanup(os.close, wfd)
2349 self.assertEqual(os.get_inheritable(rfd), False)
2350 self.assertEqual(os.get_inheritable(wfd), False)
2351
2352 def test_dup(self):
2353 fd1 = os.open(__file__, os.O_RDONLY)
2354 self.addCleanup(os.close, fd1)
2355
2356 fd2 = os.dup(fd1)
2357 self.addCleanup(os.close, fd2)
2358 self.assertEqual(os.get_inheritable(fd2), False)
2359
2360 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2361 def test_dup2(self):
2362 fd = os.open(__file__, os.O_RDONLY)
2363 self.addCleanup(os.close, fd)
2364
2365 # inheritable by default
2366 fd2 = os.open(__file__, os.O_RDONLY)
2367 try:
2368 os.dup2(fd, fd2)
2369 self.assertEqual(os.get_inheritable(fd2), True)
2370 finally:
2371 os.close(fd2)
2372
2373 # force non-inheritable
2374 fd3 = os.open(__file__, os.O_RDONLY)
2375 try:
2376 os.dup2(fd, fd3, inheritable=False)
2377 self.assertEqual(os.get_inheritable(fd3), False)
2378 finally:
2379 os.close(fd3)
2380
2381 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2382 def test_openpty(self):
2383 master_fd, slave_fd = os.openpty()
2384 self.addCleanup(os.close, master_fd)
2385 self.addCleanup(os.close, slave_fd)
2386 self.assertEqual(os.get_inheritable(master_fd), False)
2387 self.assertEqual(os.get_inheritable(slave_fd), False)
2388
2389
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002390@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002391def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002392 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002393 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002394 StatAttributeTests,
2395 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002396 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002397 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002398 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002399 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002400 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002401 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002402 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002403 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002404 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002405 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002406 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002407 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002408 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002409 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002410 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002411 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002412 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002413 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002414 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002415 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002416 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002417 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002418 TermsizeTests,
Victor Stinner292c8352012-10-30 02:17:38 +01002419 OSErrorTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002420 RemoveDirsTests,
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002421 CPUCountTests,
Victor Stinnerdaf45552013-08-28 00:53:59 +02002422 FDInheritanceTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002423 )
Fred Drake2e2be372001-09-20 21:33:42 +00002424
2425if __name__ == "__main__":
2426 test_main()