blob: 57de993319ba6e1cc4c1fd7547d09bbe4c223f23 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000027try:
28 import threading
29except ImportError:
30 threading = None
Georg Brandl2daf6ae2012-02-20 19:54:16 +010031from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000032
Victor Stinner034d0aa2012-06-05 01:22:15 +020033with warnings.catch_warnings():
34 warnings.simplefilter("ignore", DeprecationWarning)
35 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010036st = os.stat(__file__)
37stat_supports_subsecond = (
38 # check if float and int timestamps are different
39 (st.st_atime != st[7])
40 or (st.st_mtime != st[8])
41 or (st.st_ctime != st[9]))
42
Mark Dickinson7cf03892010-04-16 13:45:35 +000043# Detect whether we're on a Linux system that uses the (now outdated
44# and unmaintained) linuxthreads threading library. There's an issue
45# when combining linuxthreads with a failed execv call: see
46# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020047if hasattr(sys, 'thread_info') and sys.thread_info.version:
48 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
49else:
50 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000051
Thomas Wouters0e3f5912006-08-11 14:57:12 +000052# Tests creating TESTFN
53class FileTests(unittest.TestCase):
54 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000055 if os.path.exists(support.TESTFN):
56 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000057 tearDown = setUp
58
59 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000060 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000061 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000062 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000063
Christian Heimesfdab48e2008-01-20 09:06:41 +000064 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000065 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
66 # We must allocate two consecutive file descriptors, otherwise
67 # it will mess up other file descriptors (perhaps even the three
68 # standard ones).
69 second = os.dup(first)
70 try:
71 retries = 0
72 while second != first + 1:
73 os.close(first)
74 retries += 1
75 if retries > 10:
76 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000077 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000078 first, second = second, os.dup(second)
79 finally:
80 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000081 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000082 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000083 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000084
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000085 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000086 def test_rename(self):
87 path = support.TESTFN
88 old = sys.getrefcount(path)
89 self.assertRaises(TypeError, os.rename, path, 0)
90 new = sys.getrefcount(path)
91 self.assertEqual(old, new)
92
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000093 def test_read(self):
94 with open(support.TESTFN, "w+b") as fobj:
95 fobj.write(b"spam")
96 fobj.flush()
97 fd = fobj.fileno()
98 os.lseek(fd, 0, 0)
99 s = os.read(fd, 4)
100 self.assertEqual(type(s), bytes)
101 self.assertEqual(s, b"spam")
102
103 def test_write(self):
104 # os.write() accepts bytes- and buffer-like objects but not strings
105 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
106 self.assertRaises(TypeError, os.write, fd, "beans")
107 os.write(fd, b"bacon\n")
108 os.write(fd, bytearray(b"eggs\n"))
109 os.write(fd, memoryview(b"spam\n"))
110 os.close(fd)
111 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000112 self.assertEqual(fobj.read().splitlines(),
113 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000114
Victor Stinnere0daff12011-03-20 23:36:35 +0100115 def write_windows_console(self, *args):
116 retcode = subprocess.call(args,
117 # use a new console to not flood the test output
118 creationflags=subprocess.CREATE_NEW_CONSOLE,
119 # use a shell to hide the console window (SW_HIDE)
120 shell=True)
121 self.assertEqual(retcode, 0)
122
123 @unittest.skipUnless(sys.platform == 'win32',
124 'test specific to the Windows console')
125 def test_write_windows_console(self):
126 # Issue #11395: the Windows console returns an error (12: not enough
127 # space error) on writing into stdout if stdout mode is binary and the
128 # length is greater than 66,000 bytes (or less, depending on heap
129 # usage).
130 code = "print('x' * 100000)"
131 self.write_windows_console(sys.executable, "-c", code)
132 self.write_windows_console(sys.executable, "-u", "-c", code)
133
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000134 def fdopen_helper(self, *args):
135 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200136 f = os.fdopen(fd, *args)
137 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000138
139 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200140 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
141 os.close(fd)
142
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000143 self.fdopen_helper()
144 self.fdopen_helper('r')
145 self.fdopen_helper('r', 100)
146
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100147 def test_replace(self):
148 TESTFN2 = support.TESTFN + ".2"
149 with open(support.TESTFN, 'w') as f:
150 f.write("1")
151 with open(TESTFN2, 'w') as f:
152 f.write("2")
153 self.addCleanup(os.unlink, TESTFN2)
154 os.replace(support.TESTFN, TESTFN2)
155 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
156 with open(TESTFN2, 'r') as f:
157 self.assertEqual(f.read(), "1")
158
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200159
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000160# Test attributes on return values from os.*stat* family.
161class StatAttributeTests(unittest.TestCase):
162 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000163 os.mkdir(support.TESTFN)
164 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000166 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000168
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169 def tearDown(self):
170 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000171 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000172
Antoine Pitrou38425292010-09-21 18:19:07 +0000173 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000174 if not hasattr(os, "stat"):
175 return
176
Antoine Pitrou38425292010-09-21 18:19:07 +0000177 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000178
179 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000180 self.assertEqual(result[stat.ST_SIZE], 3)
181 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000182
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000183 # Make sure all the attributes are there
184 members = dir(result)
185 for name in dir(stat):
186 if name[:3] == 'ST_':
187 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000188 if name.endswith("TIME"):
189 def trunc(x): return int(x)
190 else:
191 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000192 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000193 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000194 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000195
Larry Hastings6fe20b32012-04-19 15:07:49 -0700196 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700197 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700198 for name in 'st_atime st_mtime st_ctime'.split():
199 floaty = int(getattr(result, name) * 100000)
200 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700201 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700202
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203 try:
204 result[200]
205 self.fail("No exception thrown")
206 except IndexError:
207 pass
208
209 # Make sure that assignment fails
210 try:
211 result.st_mode = 1
212 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000213 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000214 pass
215
216 try:
217 result.st_rdev = 1
218 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000219 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 pass
221
222 try:
223 result.parrot = 1
224 self.fail("No exception thrown")
225 except AttributeError:
226 pass
227
228 # Use the stat_result constructor with a too-short tuple.
229 try:
230 result2 = os.stat_result((10,))
231 self.fail("No exception thrown")
232 except TypeError:
233 pass
234
Ezio Melotti42da6632011-03-15 05:18:48 +0200235 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000236 try:
237 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
238 except TypeError:
239 pass
240
Antoine Pitrou38425292010-09-21 18:19:07 +0000241 def test_stat_attributes(self):
242 self.check_stat_attributes(self.fname)
243
244 def test_stat_attributes_bytes(self):
245 try:
246 fname = self.fname.encode(sys.getfilesystemencoding())
247 except UnicodeEncodeError:
248 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100249 with warnings.catch_warnings():
250 warnings.simplefilter("ignore", DeprecationWarning)
251 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000252
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000253 def test_statvfs_attributes(self):
254 if not hasattr(os, "statvfs"):
255 return
256
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000257 try:
258 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000259 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000260 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000261 if e.errno == errno.ENOSYS:
262 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
264 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000266
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000267 # Make sure all the attributes are there.
268 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
269 'ffree', 'favail', 'flag', 'namemax')
270 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000271 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272
273 # Make sure that assignment really fails
274 try:
275 result.f_bfree = 1
276 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000277 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278 pass
279
280 try:
281 result.parrot = 1
282 self.fail("No exception thrown")
283 except AttributeError:
284 pass
285
286 # Use the constructor with a too-short tuple.
287 try:
288 result2 = os.statvfs_result((10,))
289 self.fail("No exception thrown")
290 except TypeError:
291 pass
292
Ezio Melotti42da6632011-03-15 05:18:48 +0200293 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000294 try:
295 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
296 except TypeError:
297 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000298
Thomas Wouters89f507f2006-12-13 04:49:30 +0000299 def test_utime_dir(self):
300 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000301 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000302 # round to int, because some systems may support sub-second
303 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000304 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
305 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000306 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000307
Larry Hastings76ad59b2012-05-03 00:30:07 -0700308 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600309 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600310 # second argument. Check that the previous methods of passing
311 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700312 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600313 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700314 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
315 # Setting the time to the time you just read, then reading again,
316 # should always return exactly the same times.
317 st1 = os.stat(filename)
318 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
319 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600320 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700321 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600322 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700323 # Set to the current time in the new way
324 os.utime(filename)
325 st3 = os.stat(filename)
326 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
327
328 def test_utime(self):
329 def utime(file, times):
330 return os.utime(file, times)
331 self._test_utime(self.fname, getattr, utime, 10)
332 self._test_utime(support.TESTFN, getattr, utime, 10)
333
334
335 def _test_utime_ns(self, set_times_ns, test_dir=True):
336 def getattr_ns(o, attr):
337 return getattr(o, attr + "_ns")
338 ten_s = 10 * 1000 * 1000 * 1000
339 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
340 if test_dir:
341 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
342
343 def test_utime_ns(self):
344 def utime_ns(file, times):
345 return os.utime(file, ns=times)
346 self._test_utime_ns(utime_ns)
347
Larry Hastings9cf065c2012-06-22 16:30:09 -0700348 requires_utime_dir_fd = unittest.skipUnless(
349 os.utime in os.supports_dir_fd,
350 "dir_fd support for utime required for this test.")
351 requires_utime_fd = unittest.skipUnless(
352 os.utime in os.supports_fd,
353 "fd support for utime required for this test.")
354 requires_utime_nofollow_symlinks = unittest.skipUnless(
355 os.utime in os.supports_follow_symlinks,
356 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700357
Larry Hastings9cf065c2012-06-22 16:30:09 -0700358 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700359 def test_lutimes_ns(self):
360 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700361 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700362 self._test_utime_ns(lutimes_ns)
363
Larry Hastings9cf065c2012-06-22 16:30:09 -0700364 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700365 def test_futimes_ns(self):
366 def futimes_ns(file, times):
367 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700368 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700369 self._test_utime_ns(futimes_ns, test_dir=False)
370
371 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700372 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700373 getattr(os, name)(arg, (5, 5), ns=(5, 5))
374
375 def test_utime_invalid_arguments(self):
376 self._utime_invalid_arguments('utime', self.fname)
377
Brian Curtin52fbea12011-11-06 13:41:17 -0600378
Victor Stinner1aa54a42012-02-08 04:09:37 +0100379 @unittest.skipUnless(stat_supports_subsecond,
380 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100381 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100382 asec, amsec = 1, 901
383 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100384 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100385 mtime = msec + mmsec * 1e-3
386 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100387 os.utime(filename, (0, 0))
388 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200389 with warnings.catch_warnings():
390 warnings.simplefilter("ignore", DeprecationWarning)
391 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100392 st = os.stat(filename)
393 self.assertAlmostEqual(st.st_atime, atime, places=3)
394 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100395
Victor Stinnera2f7c002012-02-08 03:36:25 +0100396 def test_utime_subsecond(self):
397 def set_time(filename, atime, mtime):
398 os.utime(filename, (atime, mtime))
399 self._test_utime_subsecond(set_time)
400
Larry Hastings9cf065c2012-06-22 16:30:09 -0700401 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100402 def test_futimes_subsecond(self):
403 def set_time(filename, atime, mtime):
404 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700405 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100406 self._test_utime_subsecond(set_time)
407
Larry Hastings9cf065c2012-06-22 16:30:09 -0700408 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100409 def test_futimens_subsecond(self):
410 def set_time(filename, atime, mtime):
411 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700412 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100413 self._test_utime_subsecond(set_time)
414
Larry Hastings9cf065c2012-06-22 16:30:09 -0700415 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100416 def test_futimesat_subsecond(self):
417 def set_time(filename, atime, mtime):
418 dirname = os.path.dirname(filename)
419 dirfd = os.open(dirname, os.O_RDONLY)
420 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700421 os.utime(os.path.basename(filename), dir_fd=dirfd,
422 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100423 finally:
424 os.close(dirfd)
425 self._test_utime_subsecond(set_time)
426
Larry Hastings9cf065c2012-06-22 16:30:09 -0700427 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100428 def test_lutimes_subsecond(self):
429 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700430 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100431 self._test_utime_subsecond(set_time)
432
Larry Hastings9cf065c2012-06-22 16:30:09 -0700433 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100434 def test_utimensat_subsecond(self):
435 def set_time(filename, atime, mtime):
436 dirname = os.path.dirname(filename)
437 dirfd = os.open(dirname, os.O_RDONLY)
438 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700439 os.utime(os.path.basename(filename), dir_fd=dirfd,
440 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100441 finally:
442 os.close(dirfd)
443 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100444
Thomas Wouters89f507f2006-12-13 04:49:30 +0000445 # Restrict test to Win32, since there is no guarantee other
446 # systems support centiseconds
447 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000448 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000449 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000450 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000451 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000452 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000453 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000454 return buf.value
455
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000456 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000457 def test_1565150(self):
458 t1 = 1159195039.25
459 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000460 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000461
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000462 def test_large_time(self):
463 t1 = 5000000000 # some day in 2128
464 os.utime(self.fname, (t1, t1))
465 self.assertEqual(os.stat(self.fname).st_mtime, t1)
466
Guido van Rossumd8faa362007-04-27 19:54:29 +0000467 def test_1686475(self):
468 # Verify that an open file can be stat'ed
469 try:
470 os.stat(r"c:\pagefile.sys")
471 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000472 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000473 return
474 self.fail("Could not stat pagefile.sys")
475
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000476from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000477
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000478class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000479 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000480 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000481
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000482 def setUp(self):
483 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000484 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000485 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000486 for key, value in self._reference().items():
487 os.environ[key] = value
488
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000489 def tearDown(self):
490 os.environ.clear()
491 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000492 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000493 os.environb.clear()
494 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000495
Christian Heimes90333392007-11-01 19:08:42 +0000496 def _reference(self):
497 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
498
499 def _empty_mapping(self):
500 os.environ.clear()
501 return os.environ
502
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000503 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000504 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000505 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000506 if os.path.exists("/bin/sh"):
507 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000508 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
509 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000510 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000511
Christian Heimes1a13d592007-11-08 14:16:55 +0000512 def test_os_popen_iter(self):
513 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000514 with os.popen(
515 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
516 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000517 self.assertEqual(next(it), "line1\n")
518 self.assertEqual(next(it), "line2\n")
519 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000520 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000521
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000522 # Verify environ keys and values from the OS are of the
523 # correct str type.
524 def test_keyvalue_types(self):
525 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000526 self.assertEqual(type(key), str)
527 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000528
Christian Heimes90333392007-11-01 19:08:42 +0000529 def test_items(self):
530 for key, value in self._reference().items():
531 self.assertEqual(os.environ.get(key), value)
532
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000533 # Issue 7310
534 def test___repr__(self):
535 """Check that the repr() of os.environ looks like environ({...})."""
536 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000537 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
538 '{!r}: {!r}'.format(key, value)
539 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000540
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000541 def test_get_exec_path(self):
542 defpath_list = os.defpath.split(os.pathsep)
543 test_path = ['/monty', '/python', '', '/flying/circus']
544 test_env = {'PATH': os.pathsep.join(test_path)}
545
546 saved_environ = os.environ
547 try:
548 os.environ = dict(test_env)
549 # Test that defaulting to os.environ works.
550 self.assertSequenceEqual(test_path, os.get_exec_path())
551 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
552 finally:
553 os.environ = saved_environ
554
555 # No PATH environment variable
556 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
557 # Empty PATH environment variable
558 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
559 # Supplied PATH environment variable
560 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
561
Victor Stinnerb745a742010-05-18 17:17:23 +0000562 if os.supports_bytes_environ:
563 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000564 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000565 # ignore BytesWarning warning
566 with warnings.catch_warnings(record=True):
567 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000568 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000569 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000570 pass
571 else:
572 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000573
574 # bytes key and/or value
575 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
576 ['abc'])
577 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
578 ['abc'])
579 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
580 ['abc'])
581
582 @unittest.skipUnless(os.supports_bytes_environ,
583 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000584 def test_environb(self):
585 # os.environ -> os.environb
586 value = 'euro\u20ac'
587 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000588 value_bytes = value.encode(sys.getfilesystemencoding(),
589 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000590 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000591 msg = "U+20AC character is not encodable to %s" % (
592 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000593 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000594 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000595 self.assertEqual(os.environ['unicode'], value)
596 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000597
598 # os.environb -> os.environ
599 value = b'\xff'
600 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000601 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000602 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000603 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000604
Charles-François Natali2966f102011-11-26 11:32:46 +0100605 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
606 # #13415).
607 @support.requires_freebsd_version(7)
608 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100609 def test_unset_error(self):
610 if sys.platform == "win32":
611 # an environment variable is limited to 32,767 characters
612 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100613 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100614 else:
615 # "=" is not allowed in a variable name
616 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100617 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100618
Tim Petersc4e09402003-04-25 07:11:48 +0000619class WalkTests(unittest.TestCase):
620 """Tests for os.walk()."""
621
Charles-François Natali7372b062012-02-05 15:15:38 +0100622 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000623 import os
624 from os.path import join
625
626 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000627 # TESTFN/
628 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000629 # tmp1
630 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000631 # tmp2
632 # SUB11/ no kids
633 # SUB2/ a file kid and a dirsymlink kid
634 # tmp3
635 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200636 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000637 # TEST2/
638 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000639 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000640 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000641 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000642 sub2_path = join(walk_path, "SUB2")
643 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000644 tmp2_path = join(sub1_path, "tmp2")
645 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000646 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000647 t2_path = join(support.TESTFN, "TEST2")
648 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200649 link_path = join(sub2_path, "link")
650 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000651
652 # Create stuff.
653 os.makedirs(sub11_path)
654 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000655 os.makedirs(t2_path)
656 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000657 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000658 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
659 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000660 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100661 if os.name == 'nt':
662 def symlink_to_dir(src, dest):
663 os.symlink(src, dest, True)
664 else:
665 symlink_to_dir = os.symlink
666 symlink_to_dir(os.path.abspath(t2_path), link_path)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200667 symlink_to_dir('broken', broken_link_path)
668 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000669 else:
670 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000671
672 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000673 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000674 self.assertEqual(len(all), 4)
675 # We can't know which order SUB1 and SUB2 will appear in.
676 # Not flipped: TESTFN, SUB1, SUB11, SUB2
677 # flipped: TESTFN, SUB2, SUB1, SUB11
678 flipped = all[0][1][0] != "SUB1"
679 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200680 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000681 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000682 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
683 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000684 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000685
686 # Prune the search.
687 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000688 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000689 all.append((root, dirs, files))
690 # Don't descend into SUB1.
691 if 'SUB1' in dirs:
692 # Note that this also mutates the dirs we appended to all!
693 dirs.remove('SUB1')
694 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000695 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200696 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000697 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000698
699 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000700 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000701 self.assertEqual(len(all), 4)
702 # We can't know which order SUB1 and SUB2 will appear in.
703 # Not flipped: SUB11, SUB1, SUB2, TESTFN
704 # flipped: SUB2, SUB11, SUB1, TESTFN
705 flipped = all[3][1][0] != "SUB1"
706 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200707 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000708 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000709 self.assertEqual(all[flipped], (sub11_path, [], []))
710 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000711 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000712
Brian Curtin3b4499c2010-12-28 14:31:47 +0000713 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000714 # Walk, following symlinks.
715 for root, dirs, files in os.walk(walk_path, followlinks=True):
716 if root == link_path:
717 self.assertEqual(dirs, [])
718 self.assertEqual(files, ["tmp4"])
719 break
720 else:
721 self.fail("Didn't follow symlink with followlinks=True")
722
723 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000724 # Tear everything down. This is a decent use for bottom-up on
725 # Windows, which doesn't have a recursive delete command. The
726 # (not so) subtlety is that rmdir will fail unless the dir's
727 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000728 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000729 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000730 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000731 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000732 dirname = os.path.join(root, name)
733 if not os.path.islink(dirname):
734 os.rmdir(dirname)
735 else:
736 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000737 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000738
Charles-François Natali7372b062012-02-05 15:15:38 +0100739
740@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
741class FwalkTests(WalkTests):
742 """Tests for os.fwalk()."""
743
Larry Hastingsc48fe982012-06-25 04:49:05 -0700744 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
745 """
746 compare with walk() results.
747 """
Charles-François Natali7372b062012-02-05 15:15:38 +0100748 for topdown, followlinks in itertools.product((True, False), repeat=2):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700749 d = {'topdown': topdown, 'followlinks': followlinks}
750 walk_kwargs.update(d)
751 fwalk_kwargs.update(d)
752
Charles-François Natali7372b062012-02-05 15:15:38 +0100753 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700754 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100755 expected[root] = (set(dirs), set(files))
756
Larry Hastingsc48fe982012-06-25 04:49:05 -0700757 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100758 self.assertIn(root, expected)
759 self.assertEqual(expected[root], (set(dirs), set(files)))
760
Larry Hastingsc48fe982012-06-25 04:49:05 -0700761 def test_compare_to_walk(self):
762 kwargs = {'top': support.TESTFN}
763 self._compare_to_walk(kwargs, kwargs)
764
Charles-François Natali7372b062012-02-05 15:15:38 +0100765 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700766 try:
767 fd = os.open(".", os.O_RDONLY)
768 walk_kwargs = {'top': support.TESTFN}
769 fwalk_kwargs = walk_kwargs.copy()
770 fwalk_kwargs['dir_fd'] = fd
771 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
772 finally:
773 os.close(fd)
774
775 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100776 # check returned file descriptors
777 for topdown, followlinks in itertools.product((True, False), repeat=2):
778 args = support.TESTFN, topdown, None, followlinks
779 for root, dirs, files, rootfd in os.fwalk(*args):
780 # check that the FD is valid
781 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700782 # redundant check
783 os.stat(rootfd)
784 # check that listdir() returns consistent information
785 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100786
787 def test_fd_leak(self):
788 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
789 # we both check that calling fwalk() a large number of times doesn't
790 # yield EMFILE, and that the minimum allocated FD hasn't changed.
791 minfd = os.dup(1)
792 os.close(minfd)
793 for i in range(256):
794 for x in os.fwalk(support.TESTFN):
795 pass
796 newfd = os.dup(1)
797 self.addCleanup(os.close, newfd)
798 self.assertEqual(newfd, minfd)
799
800 def tearDown(self):
801 # cleanup
802 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
803 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700804 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100805 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700806 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700807 if stat.S_ISDIR(st.st_mode):
808 os.rmdir(name, dir_fd=rootfd)
809 else:
810 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100811 os.rmdir(support.TESTFN)
812
813
Guido van Rossume7ba4952007-06-06 23:52:48 +0000814class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000815 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000816 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000817
818 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000819 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000820 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
821 os.makedirs(path) # Should work
822 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
823 os.makedirs(path)
824
825 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000826 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000827 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
828 os.makedirs(path)
829 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
830 'dir5', 'dir6')
831 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000832
Terry Reedy5a22b652010-12-02 07:05:56 +0000833 def test_exist_ok_existing_directory(self):
834 path = os.path.join(support.TESTFN, 'dir1')
835 mode = 0o777
836 old_mask = os.umask(0o022)
837 os.makedirs(path, mode)
838 self.assertRaises(OSError, os.makedirs, path, mode)
839 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
840 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
841 os.makedirs(path, mode=mode, exist_ok=True)
842 os.umask(old_mask)
843
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700844 def test_exist_ok_s_isgid_directory(self):
845 path = os.path.join(support.TESTFN, 'dir1')
846 S_ISGID = stat.S_ISGID
847 mode = 0o777
848 old_mask = os.umask(0o022)
849 try:
850 existing_testfn_mode = stat.S_IMODE(
851 os.lstat(support.TESTFN).st_mode)
852 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
853 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
854 raise unittest.SkipTest('No support for S_ISGID dir mode.')
855 # The os should apply S_ISGID from the parent dir for us, but
856 # this test need not depend on that behavior. Be explicit.
857 os.makedirs(path, mode | S_ISGID)
858 # http://bugs.python.org/issue14992
859 # Should not fail when the bit is already set.
860 os.makedirs(path, mode, exist_ok=True)
861 # remove the bit.
862 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
863 with self.assertRaises(OSError):
864 # Should fail when the bit is not already set when demanded.
865 os.makedirs(path, mode | S_ISGID, exist_ok=True)
866 finally:
867 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000868
869 def test_exist_ok_existing_regular_file(self):
870 base = support.TESTFN
871 path = os.path.join(support.TESTFN, 'dir1')
872 f = open(path, 'w')
873 f.write('abc')
874 f.close()
875 self.assertRaises(OSError, os.makedirs, path)
876 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
877 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
878 os.remove(path)
879
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000880 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000881 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000882 'dir4', 'dir5', 'dir6')
883 # If the tests failed, the bottom-most directory ('../dir6')
884 # may not have been created, so we look for the outermost directory
885 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000886 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000887 path = os.path.dirname(path)
888
889 os.removedirs(path)
890
Guido van Rossume7ba4952007-06-06 23:52:48 +0000891class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000892 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200893 with open(os.devnull, 'wb') as f:
894 f.write(b'hello')
895 f.close()
896 with open(os.devnull, 'rb') as f:
897 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000898
Guido van Rossume7ba4952007-06-06 23:52:48 +0000899class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100900 def test_urandom_length(self):
901 self.assertEqual(len(os.urandom(0)), 0)
902 self.assertEqual(len(os.urandom(1)), 1)
903 self.assertEqual(len(os.urandom(10)), 10)
904 self.assertEqual(len(os.urandom(100)), 100)
905 self.assertEqual(len(os.urandom(1000)), 1000)
906
907 def test_urandom_value(self):
908 data1 = os.urandom(16)
909 data2 = os.urandom(16)
910 self.assertNotEqual(data1, data2)
911
912 def get_urandom_subprocess(self, count):
913 code = '\n'.join((
914 'import os, sys',
915 'data = os.urandom(%s)' % count,
916 'sys.stdout.buffer.write(data)',
917 'sys.stdout.buffer.flush()'))
918 out = assert_python_ok('-c', code)
919 stdout = out[1]
920 self.assertEqual(len(stdout), 16)
921 return stdout
922
923 def test_urandom_subprocess(self):
924 data1 = self.get_urandom_subprocess(16)
925 data2 = self.get_urandom_subprocess(16)
926 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000927
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000928@contextlib.contextmanager
929def _execvpe_mockup(defpath=None):
930 """
931 Stubs out execv and execve functions when used as context manager.
932 Records exec calls. The mock execv and execve functions always raise an
933 exception as they would normally never return.
934 """
935 # A list of tuples containing (function name, first arg, args)
936 # of calls to execv or execve that have been made.
937 calls = []
938
939 def mock_execv(name, *args):
940 calls.append(('execv', name, args))
941 raise RuntimeError("execv called")
942
943 def mock_execve(name, *args):
944 calls.append(('execve', name, args))
945 raise OSError(errno.ENOTDIR, "execve called")
946
947 try:
948 orig_execv = os.execv
949 orig_execve = os.execve
950 orig_defpath = os.defpath
951 os.execv = mock_execv
952 os.execve = mock_execve
953 if defpath is not None:
954 os.defpath = defpath
955 yield calls
956 finally:
957 os.execv = orig_execv
958 os.execve = orig_execve
959 os.defpath = orig_defpath
960
Guido van Rossume7ba4952007-06-06 23:52:48 +0000961class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000962 @unittest.skipIf(USING_LINUXTHREADS,
963 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000964 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000965 self.assertRaises(OSError, os.execvpe, 'no such app-',
966 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000967
Thomas Heller6790d602007-08-30 17:15:14 +0000968 def test_execvpe_with_bad_arglist(self):
969 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
970
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000971 @unittest.skipUnless(hasattr(os, '_execvpe'),
972 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000973 def _test_internal_execvpe(self, test_type):
974 program_path = os.sep + 'absolutepath'
975 if test_type is bytes:
976 program = b'executable'
977 fullpath = os.path.join(os.fsencode(program_path), program)
978 native_fullpath = fullpath
979 arguments = [b'progname', 'arg1', 'arg2']
980 else:
981 program = 'executable'
982 arguments = ['progname', 'arg1', 'arg2']
983 fullpath = os.path.join(program_path, program)
984 if os.name != "nt":
985 native_fullpath = os.fsencode(fullpath)
986 else:
987 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000988 env = {'spam': 'beans'}
989
Victor Stinnerb745a742010-05-18 17:17:23 +0000990 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000991 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000992 self.assertRaises(RuntimeError,
993 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000994 self.assertEqual(len(calls), 1)
995 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
996
Victor Stinnerb745a742010-05-18 17:17:23 +0000997 # test os._execvpe() with a relative path:
998 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000999 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001000 self.assertRaises(OSError,
1001 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001002 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001003 self.assertSequenceEqual(calls[0],
1004 ('execve', native_fullpath, (arguments, env)))
1005
1006 # test os._execvpe() with a relative path:
1007 # os.get_exec_path() reads the 'PATH' variable
1008 with _execvpe_mockup() as calls:
1009 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001010 if test_type is bytes:
1011 env_path[b'PATH'] = program_path
1012 else:
1013 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001014 self.assertRaises(OSError,
1015 os._execvpe, program, arguments, env=env_path)
1016 self.assertEqual(len(calls), 1)
1017 self.assertSequenceEqual(calls[0],
1018 ('execve', native_fullpath, (arguments, env_path)))
1019
1020 def test_internal_execvpe_str(self):
1021 self._test_internal_execvpe(str)
1022 if os.name != "nt":
1023 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001024
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001025
Thomas Wouters477c8d52006-05-27 19:21:47 +00001026class Win32ErrorTests(unittest.TestCase):
1027 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001028 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001029
1030 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001031 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001032
1033 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001034 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001035
1036 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001037 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001038 try:
1039 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
1040 finally:
1041 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001042 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001043
1044 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001045 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001046
Thomas Wouters477c8d52006-05-27 19:21:47 +00001047 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +00001048 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001049
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001050class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001051 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001052 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1053 #singles.append("close")
1054 #We omit close because it doesn'r raise an exception on some platforms
1055 def get_single(f):
1056 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001057 if hasattr(os, f):
1058 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001059 return helper
1060 for f in singles:
1061 locals()["test_"+f] = get_single(f)
1062
Benjamin Peterson7522c742009-01-19 21:00:09 +00001063 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001064 try:
1065 f(support.make_bad_fd(), *args)
1066 except OSError as e:
1067 self.assertEqual(e.errno, errno.EBADF)
1068 else:
1069 self.fail("%r didn't raise a OSError with a bad file descriptor"
1070 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001071
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001072 def test_isatty(self):
1073 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001074 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001075
1076 def test_closerange(self):
1077 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001078 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +00001079 # Make sure none of the descriptors we are about to close are
1080 # currently valid (issue 6542).
1081 for i in range(10):
1082 try: os.fstat(fd+i)
1083 except OSError:
1084 pass
1085 else:
1086 break
1087 if i < 2:
1088 raise unittest.SkipTest(
1089 "Unable to acquire a range of invalid file descriptors")
1090 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001091
1092 def test_dup2(self):
1093 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001094 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001095
1096 def test_fchmod(self):
1097 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001098 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001099
1100 def test_fchown(self):
1101 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001102 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001103
1104 def test_fpathconf(self):
1105 if hasattr(os, "fpathconf"):
Georg Brandl306336b2012-06-24 12:55:33 +02001106 self.check(os.pathconf, "PC_NAME_MAX")
Benjamin Peterson7522c742009-01-19 21:00:09 +00001107 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001108
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001109 def test_ftruncate(self):
1110 if hasattr(os, "ftruncate"):
Georg Brandl306336b2012-06-24 12:55:33 +02001111 self.check(os.truncate, 0)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001112 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001113
1114 def test_lseek(self):
1115 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001116 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001117
1118 def test_read(self):
1119 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001120 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001121
1122 def test_tcsetpgrpt(self):
1123 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001124 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001125
1126 def test_write(self):
1127 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001128 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001129
Brian Curtin1b9df392010-11-24 20:24:31 +00001130
1131class LinkTests(unittest.TestCase):
1132 def setUp(self):
1133 self.file1 = support.TESTFN
1134 self.file2 = os.path.join(support.TESTFN + "2")
1135
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001136 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001137 for file in (self.file1, self.file2):
1138 if os.path.exists(file):
1139 os.unlink(file)
1140
Brian Curtin1b9df392010-11-24 20:24:31 +00001141 def _test_link(self, file1, file2):
1142 with open(file1, "w") as f1:
1143 f1.write("test")
1144
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001145 with warnings.catch_warnings():
1146 warnings.simplefilter("ignore", DeprecationWarning)
1147 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001148 with open(file1, "r") as f1, open(file2, "r") as f2:
1149 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1150
1151 def test_link(self):
1152 self._test_link(self.file1, self.file2)
1153
1154 def test_link_bytes(self):
1155 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1156 bytes(self.file2, sys.getfilesystemencoding()))
1157
Brian Curtinf498b752010-11-30 15:54:04 +00001158 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001159 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001160 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001161 except UnicodeError:
1162 raise unittest.SkipTest("Unable to encode for this platform.")
1163
Brian Curtinf498b752010-11-30 15:54:04 +00001164 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001165 self.file2 = self.file1 + "2"
1166 self._test_link(self.file1, self.file2)
1167
Thomas Wouters477c8d52006-05-27 19:21:47 +00001168if sys.platform != 'win32':
1169 class Win32ErrorTests(unittest.TestCase):
1170 pass
1171
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001172 class PosixUidGidTests(unittest.TestCase):
1173 if hasattr(os, 'setuid'):
1174 def test_setuid(self):
1175 if os.getuid() != 0:
1176 self.assertRaises(os.error, os.setuid, 0)
1177 self.assertRaises(OverflowError, os.setuid, 1<<32)
1178
1179 if hasattr(os, 'setgid'):
1180 def test_setgid(self):
1181 if os.getuid() != 0:
1182 self.assertRaises(os.error, os.setgid, 0)
1183 self.assertRaises(OverflowError, os.setgid, 1<<32)
1184
1185 if hasattr(os, 'seteuid'):
1186 def test_seteuid(self):
1187 if os.getuid() != 0:
1188 self.assertRaises(os.error, os.seteuid, 0)
1189 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1190
1191 if hasattr(os, 'setegid'):
1192 def test_setegid(self):
1193 if os.getuid() != 0:
1194 self.assertRaises(os.error, os.setegid, 0)
1195 self.assertRaises(OverflowError, os.setegid, 1<<32)
1196
1197 if hasattr(os, 'setreuid'):
1198 def test_setreuid(self):
1199 if os.getuid() != 0:
1200 self.assertRaises(os.error, os.setreuid, 0, 0)
1201 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1202 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001203
1204 def test_setreuid_neg1(self):
1205 # Needs to accept -1. We run this in a subprocess to avoid
1206 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001207 subprocess.check_call([
1208 sys.executable, '-c',
1209 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001210
1211 if hasattr(os, 'setregid'):
1212 def test_setregid(self):
1213 if os.getuid() != 0:
1214 self.assertRaises(os.error, os.setregid, 0, 0)
1215 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1216 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001217
1218 def test_setregid_neg1(self):
1219 # Needs to accept -1. We run this in a subprocess to avoid
1220 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001221 subprocess.check_call([
1222 sys.executable, '-c',
1223 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001224
1225 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001226 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001227 if support.TESTFN_UNENCODABLE:
1228 self.dir = support.TESTFN_UNENCODABLE
1229 else:
1230 self.dir = support.TESTFN
1231 self.bdir = os.fsencode(self.dir)
1232
1233 bytesfn = []
1234 def add_filename(fn):
1235 try:
1236 fn = os.fsencode(fn)
1237 except UnicodeEncodeError:
1238 return
1239 bytesfn.append(fn)
1240 add_filename(support.TESTFN_UNICODE)
1241 if support.TESTFN_UNENCODABLE:
1242 add_filename(support.TESTFN_UNENCODABLE)
1243 if not bytesfn:
1244 self.skipTest("couldn't create any non-ascii filename")
1245
1246 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001247 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001248 try:
1249 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001250 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001251 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001252 if fn in self.unicodefn:
1253 raise ValueError("duplicate filename")
1254 self.unicodefn.add(fn)
1255 except:
1256 shutil.rmtree(self.dir)
1257 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001258
1259 def tearDown(self):
1260 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001261
1262 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001263 expected = self.unicodefn
1264 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001265 self.assertEqual(found, expected)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001266 # test listdir without arguments
1267 current_directory = os.getcwd()
1268 try:
1269 os.chdir(os.sep)
1270 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1271 finally:
1272 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001273
1274 def test_open(self):
1275 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001276 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001277 f.close()
1278
1279 def test_stat(self):
1280 for fn in self.unicodefn:
1281 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001282else:
1283 class PosixUidGidTests(unittest.TestCase):
1284 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001285 class Pep383Tests(unittest.TestCase):
1286 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001287
Brian Curtineb24d742010-04-12 17:16:38 +00001288@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1289class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001290 def _kill(self, sig):
1291 # Start sys.executable as a subprocess and communicate from the
1292 # subprocess to the parent that the interpreter is ready. When it
1293 # becomes ready, send *sig* via os.kill to the subprocess and check
1294 # that the return code is equal to *sig*.
1295 import ctypes
1296 from ctypes import wintypes
1297 import msvcrt
1298
1299 # Since we can't access the contents of the process' stdout until the
1300 # process has exited, use PeekNamedPipe to see what's inside stdout
1301 # without waiting. This is done so we can tell that the interpreter
1302 # is started and running at a point where it could handle a signal.
1303 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1304 PeekNamedPipe.restype = wintypes.BOOL
1305 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1306 ctypes.POINTER(ctypes.c_char), # stdout buf
1307 wintypes.DWORD, # Buffer size
1308 ctypes.POINTER(wintypes.DWORD), # bytes read
1309 ctypes.POINTER(wintypes.DWORD), # bytes avail
1310 ctypes.POINTER(wintypes.DWORD)) # bytes left
1311 msg = "running"
1312 proc = subprocess.Popen([sys.executable, "-c",
1313 "import sys;"
1314 "sys.stdout.write('{}');"
1315 "sys.stdout.flush();"
1316 "input()".format(msg)],
1317 stdout=subprocess.PIPE,
1318 stderr=subprocess.PIPE,
1319 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001320 self.addCleanup(proc.stdout.close)
1321 self.addCleanup(proc.stderr.close)
1322 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001323
1324 count, max = 0, 100
1325 while count < max and proc.poll() is None:
1326 # Create a string buffer to store the result of stdout from the pipe
1327 buf = ctypes.create_string_buffer(len(msg))
1328 # Obtain the text currently in proc.stdout
1329 # Bytes read/avail/left are left as NULL and unused
1330 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1331 buf, ctypes.sizeof(buf), None, None, None)
1332 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1333 if buf.value:
1334 self.assertEqual(msg, buf.value.decode())
1335 break
1336 time.sleep(0.1)
1337 count += 1
1338 else:
1339 self.fail("Did not receive communication from the subprocess")
1340
Brian Curtineb24d742010-04-12 17:16:38 +00001341 os.kill(proc.pid, sig)
1342 self.assertEqual(proc.wait(), sig)
1343
1344 def test_kill_sigterm(self):
1345 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001346 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001347
1348 def test_kill_int(self):
1349 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001350 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001351
1352 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001353 tagname = "test_os_%s" % uuid.uuid1()
1354 m = mmap.mmap(-1, 1, tagname)
1355 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001356 # Run a script which has console control handling enabled.
1357 proc = subprocess.Popen([sys.executable,
1358 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001359 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001360 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1361 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001362 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001363 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001364 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001365 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001366 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001367 count += 1
1368 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001369 # Forcefully kill the process if we weren't able to signal it.
1370 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001371 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001372 os.kill(proc.pid, event)
1373 # proc.send_signal(event) could also be done here.
1374 # Allow time for the signal to be passed and the process to exit.
1375 time.sleep(0.5)
1376 if not proc.poll():
1377 # Forcefully kill the process if we weren't able to signal it.
1378 os.kill(proc.pid, signal.SIGINT)
1379 self.fail("subprocess did not stop on {}".format(name))
1380
1381 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1382 def test_CTRL_C_EVENT(self):
1383 from ctypes import wintypes
1384 import ctypes
1385
1386 # Make a NULL value by creating a pointer with no argument.
1387 NULL = ctypes.POINTER(ctypes.c_int)()
1388 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1389 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1390 wintypes.BOOL)
1391 SetConsoleCtrlHandler.restype = wintypes.BOOL
1392
1393 # Calling this with NULL and FALSE causes the calling process to
1394 # handle CTRL+C, rather than ignore it. This property is inherited
1395 # by subprocesses.
1396 SetConsoleCtrlHandler(NULL, 0)
1397
1398 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1399
1400 def test_CTRL_BREAK_EVENT(self):
1401 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1402
1403
Brian Curtind40e6f72010-07-08 21:39:08 +00001404@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001405@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001406class Win32SymlinkTests(unittest.TestCase):
1407 filelink = 'filelinktest'
1408 filelink_target = os.path.abspath(__file__)
1409 dirlink = 'dirlinktest'
1410 dirlink_target = os.path.dirname(filelink_target)
1411 missing_link = 'missing link'
1412
1413 def setUp(self):
1414 assert os.path.exists(self.dirlink_target)
1415 assert os.path.exists(self.filelink_target)
1416 assert not os.path.exists(self.dirlink)
1417 assert not os.path.exists(self.filelink)
1418 assert not os.path.exists(self.missing_link)
1419
1420 def tearDown(self):
1421 if os.path.exists(self.filelink):
1422 os.remove(self.filelink)
1423 if os.path.exists(self.dirlink):
1424 os.rmdir(self.dirlink)
1425 if os.path.lexists(self.missing_link):
1426 os.remove(self.missing_link)
1427
1428 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001429 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001430 self.assertTrue(os.path.exists(self.dirlink))
1431 self.assertTrue(os.path.isdir(self.dirlink))
1432 self.assertTrue(os.path.islink(self.dirlink))
1433 self.check_stat(self.dirlink, self.dirlink_target)
1434
1435 def test_file_link(self):
1436 os.symlink(self.filelink_target, self.filelink)
1437 self.assertTrue(os.path.exists(self.filelink))
1438 self.assertTrue(os.path.isfile(self.filelink))
1439 self.assertTrue(os.path.islink(self.filelink))
1440 self.check_stat(self.filelink, self.filelink_target)
1441
1442 def _create_missing_dir_link(self):
1443 'Create a "directory" link to a non-existent target'
1444 linkname = self.missing_link
1445 if os.path.lexists(linkname):
1446 os.remove(linkname)
1447 target = r'c:\\target does not exist.29r3c740'
1448 assert not os.path.exists(target)
1449 target_is_dir = True
1450 os.symlink(target, linkname, target_is_dir)
1451
1452 def test_remove_directory_link_to_missing_target(self):
1453 self._create_missing_dir_link()
1454 # For compatibility with Unix, os.remove will check the
1455 # directory status and call RemoveDirectory if the symlink
1456 # was created with target_is_dir==True.
1457 os.remove(self.missing_link)
1458
1459 @unittest.skip("currently fails; consider for improvement")
1460 def test_isdir_on_directory_link_to_missing_target(self):
1461 self._create_missing_dir_link()
1462 # consider having isdir return true for directory links
1463 self.assertTrue(os.path.isdir(self.missing_link))
1464
1465 @unittest.skip("currently fails; consider for improvement")
1466 def test_rmdir_on_directory_link_to_missing_target(self):
1467 self._create_missing_dir_link()
1468 # consider allowing rmdir to remove directory links
1469 os.rmdir(self.missing_link)
1470
1471 def check_stat(self, link, target):
1472 self.assertEqual(os.stat(link), os.stat(target))
1473 self.assertNotEqual(os.lstat(link), os.stat(link))
1474
Brian Curtind25aef52011-06-13 15:16:04 -05001475 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001476 with warnings.catch_warnings():
1477 warnings.simplefilter("ignore", DeprecationWarning)
1478 self.assertEqual(os.stat(bytes_link), os.stat(target))
1479 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001480
1481 def test_12084(self):
1482 level1 = os.path.abspath(support.TESTFN)
1483 level2 = os.path.join(level1, "level2")
1484 level3 = os.path.join(level2, "level3")
1485 try:
1486 os.mkdir(level1)
1487 os.mkdir(level2)
1488 os.mkdir(level3)
1489
1490 file1 = os.path.abspath(os.path.join(level1, "file1"))
1491
1492 with open(file1, "w") as f:
1493 f.write("file1")
1494
1495 orig_dir = os.getcwd()
1496 try:
1497 os.chdir(level2)
1498 link = os.path.join(level2, "link")
1499 os.symlink(os.path.relpath(file1), "link")
1500 self.assertIn("link", os.listdir(os.getcwd()))
1501
1502 # Check os.stat calls from the same dir as the link
1503 self.assertEqual(os.stat(file1), os.stat("link"))
1504
1505 # Check os.stat calls from a dir below the link
1506 os.chdir(level1)
1507 self.assertEqual(os.stat(file1),
1508 os.stat(os.path.relpath(link)))
1509
1510 # Check os.stat calls from a dir above the link
1511 os.chdir(level3)
1512 self.assertEqual(os.stat(file1),
1513 os.stat(os.path.relpath(link)))
1514 finally:
1515 os.chdir(orig_dir)
1516 except OSError as err:
1517 self.fail(err)
1518 finally:
1519 os.remove(file1)
1520 shutil.rmtree(level1)
1521
Brian Curtind40e6f72010-07-08 21:39:08 +00001522
Victor Stinnere8d51452010-08-19 01:05:19 +00001523class FSEncodingTests(unittest.TestCase):
1524 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001525 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1526 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001527
Victor Stinnere8d51452010-08-19 01:05:19 +00001528 def test_identity(self):
1529 # assert fsdecode(fsencode(x)) == x
1530 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1531 try:
1532 bytesfn = os.fsencode(fn)
1533 except UnicodeEncodeError:
1534 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001535 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001536
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001537
Brett Cannonefb00c02012-02-29 18:31:31 -05001538
1539class DeviceEncodingTests(unittest.TestCase):
1540
1541 def test_bad_fd(self):
1542 # Return None when an fd doesn't actually exist.
1543 self.assertIsNone(os.device_encoding(123456))
1544
Philip Jenveye308b7c2012-02-29 16:16:15 -08001545 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1546 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001547 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001548 def test_device_encoding(self):
1549 encoding = os.device_encoding(0)
1550 self.assertIsNotNone(encoding)
1551 self.assertTrue(codecs.lookup(encoding))
1552
1553
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001554class PidTests(unittest.TestCase):
1555 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1556 def test_getppid(self):
1557 p = subprocess.Popen([sys.executable, '-c',
1558 'import os; print(os.getppid())'],
1559 stdout=subprocess.PIPE)
1560 stdout, _ = p.communicate()
1561 # We are the parent of our subprocess
1562 self.assertEqual(int(stdout), os.getpid())
1563
1564
Brian Curtin0151b8e2010-09-24 13:43:43 +00001565# The introduction of this TestCase caused at least two different errors on
1566# *nix buildbots. Temporarily skip this to let the buildbots move along.
1567@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001568@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1569class LoginTests(unittest.TestCase):
1570 def test_getlogin(self):
1571 user_name = os.getlogin()
1572 self.assertNotEqual(len(user_name), 0)
1573
1574
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001575@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1576 "needs os.getpriority and os.setpriority")
1577class ProgramPriorityTests(unittest.TestCase):
1578 """Tests for os.getpriority() and os.setpriority()."""
1579
1580 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001581
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001582 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1583 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1584 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001585 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1586 if base >= 19 and new_prio <= 19:
1587 raise unittest.SkipTest(
1588 "unable to reliably test setpriority at current nice level of %s" % base)
1589 else:
1590 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001591 finally:
1592 try:
1593 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1594 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001595 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001596 raise
1597
1598
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001599if threading is not None:
1600 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001601
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001602 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001603
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001604 def __init__(self, conn):
1605 asynchat.async_chat.__init__(self, conn)
1606 self.in_buffer = []
1607 self.closed = False
1608 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001609
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001610 def handle_read(self):
1611 data = self.recv(4096)
1612 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001613
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001614 def get_data(self):
1615 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001616
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001617 def handle_close(self):
1618 self.close()
1619 self.closed = True
1620
1621 def handle_error(self):
1622 raise
1623
1624 def __init__(self, address):
1625 threading.Thread.__init__(self)
1626 asyncore.dispatcher.__init__(self)
1627 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1628 self.bind(address)
1629 self.listen(5)
1630 self.host, self.port = self.socket.getsockname()[:2]
1631 self.handler_instance = None
1632 self._active = False
1633 self._active_lock = threading.Lock()
1634
1635 # --- public API
1636
1637 @property
1638 def running(self):
1639 return self._active
1640
1641 def start(self):
1642 assert not self.running
1643 self.__flag = threading.Event()
1644 threading.Thread.start(self)
1645 self.__flag.wait()
1646
1647 def stop(self):
1648 assert self.running
1649 self._active = False
1650 self.join()
1651
1652 def wait(self):
1653 # wait for handler connection to be closed, then stop the server
1654 while not getattr(self.handler_instance, "closed", False):
1655 time.sleep(0.001)
1656 self.stop()
1657
1658 # --- internals
1659
1660 def run(self):
1661 self._active = True
1662 self.__flag.set()
1663 while self._active and asyncore.socket_map:
1664 self._active_lock.acquire()
1665 asyncore.loop(timeout=0.001, count=1)
1666 self._active_lock.release()
1667 asyncore.close_all()
1668
1669 def handle_accept(self):
1670 conn, addr = self.accept()
1671 self.handler_instance = self.Handler(conn)
1672
1673 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001674 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001675 handle_read = handle_connect
1676
1677 def writable(self):
1678 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001679
1680 def handle_error(self):
1681 raise
1682
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001683
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001684@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001685@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1686class TestSendfile(unittest.TestCase):
1687
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001688 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001689 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001690 not sys.platform.startswith("solaris") and \
1691 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001692
1693 @classmethod
1694 def setUpClass(cls):
1695 with open(support.TESTFN, "wb") as f:
1696 f.write(cls.DATA)
1697
1698 @classmethod
1699 def tearDownClass(cls):
1700 support.unlink(support.TESTFN)
1701
1702 def setUp(self):
1703 self.server = SendfileTestServer((support.HOST, 0))
1704 self.server.start()
1705 self.client = socket.socket()
1706 self.client.connect((self.server.host, self.server.port))
1707 self.client.settimeout(1)
1708 # synchronize by waiting for "220 ready" response
1709 self.client.recv(1024)
1710 self.sockno = self.client.fileno()
1711 self.file = open(support.TESTFN, 'rb')
1712 self.fileno = self.file.fileno()
1713
1714 def tearDown(self):
1715 self.file.close()
1716 self.client.close()
1717 if self.server.running:
1718 self.server.stop()
1719
1720 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1721 """A higher level wrapper representing how an application is
1722 supposed to use sendfile().
1723 """
1724 while 1:
1725 try:
1726 if self.SUPPORT_HEADERS_TRAILERS:
1727 return os.sendfile(sock, file, offset, nbytes, headers,
1728 trailers)
1729 else:
1730 return os.sendfile(sock, file, offset, nbytes)
1731 except OSError as err:
1732 if err.errno == errno.ECONNRESET:
1733 # disconnected
1734 raise
1735 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1736 # we have to retry send data
1737 continue
1738 else:
1739 raise
1740
1741 def test_send_whole_file(self):
1742 # normal send
1743 total_sent = 0
1744 offset = 0
1745 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001746 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001747 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1748 if sent == 0:
1749 break
1750 offset += sent
1751 total_sent += sent
1752 self.assertTrue(sent <= nbytes)
1753 self.assertEqual(offset, total_sent)
1754
1755 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001756 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001757 self.client.close()
1758 self.server.wait()
1759 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001760 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001761 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001762
1763 def test_send_at_certain_offset(self):
1764 # start sending a file at a certain offset
1765 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001766 offset = len(self.DATA) // 2
1767 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001768 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001769 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001770 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1771 if sent == 0:
1772 break
1773 offset += sent
1774 total_sent += sent
1775 self.assertTrue(sent <= nbytes)
1776
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001777 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001778 self.client.close()
1779 self.server.wait()
1780 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001781 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001782 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001783 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001784 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001785
1786 def test_offset_overflow(self):
1787 # specify an offset > file size
1788 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001789 try:
1790 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1791 except OSError as e:
1792 # Solaris can raise EINVAL if offset >= file length, ignore.
1793 if e.errno != errno.EINVAL:
1794 raise
1795 else:
1796 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001797 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001798 self.client.close()
1799 self.server.wait()
1800 data = self.server.handler_instance.get_data()
1801 self.assertEqual(data, b'')
1802
1803 def test_invalid_offset(self):
1804 with self.assertRaises(OSError) as cm:
1805 os.sendfile(self.sockno, self.fileno, -1, 4096)
1806 self.assertEqual(cm.exception.errno, errno.EINVAL)
1807
1808 # --- headers / trailers tests
1809
1810 if SUPPORT_HEADERS_TRAILERS:
1811
1812 def test_headers(self):
1813 total_sent = 0
1814 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1815 headers=[b"x" * 512])
1816 total_sent += sent
1817 offset = 4096
1818 nbytes = 4096
1819 while 1:
1820 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1821 offset, nbytes)
1822 if sent == 0:
1823 break
1824 total_sent += sent
1825 offset += sent
1826
1827 expected_data = b"x" * 512 + self.DATA
1828 self.assertEqual(total_sent, len(expected_data))
1829 self.client.close()
1830 self.server.wait()
1831 data = self.server.handler_instance.get_data()
1832 self.assertEqual(hash(data), hash(expected_data))
1833
1834 def test_trailers(self):
1835 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001836 with open(TESTFN2, 'wb') as f:
1837 f.write(b"abcde")
1838 with open(TESTFN2, 'rb')as f:
1839 self.addCleanup(os.remove, TESTFN2)
1840 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1841 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001842 self.client.close()
1843 self.server.wait()
1844 data = self.server.handler_instance.get_data()
1845 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001846
1847 if hasattr(os, "SF_NODISKIO"):
1848 def test_flags(self):
1849 try:
1850 os.sendfile(self.sockno, self.fileno, 0, 4096,
1851 flags=os.SF_NODISKIO)
1852 except OSError as err:
1853 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1854 raise
1855
1856
Larry Hastings9cf065c2012-06-22 16:30:09 -07001857def supports_extended_attributes():
1858 if not hasattr(os, "setxattr"):
1859 return False
1860 try:
1861 with open(support.TESTFN, "wb") as fp:
1862 try:
1863 os.setxattr(fp.fileno(), b"user.test", b"")
1864 except OSError:
1865 return False
1866 finally:
1867 support.unlink(support.TESTFN)
1868 # Kernels < 2.6.39 don't respect setxattr flags.
1869 kernel_version = platform.release()
1870 m = re.match("2.6.(\d{1,2})", kernel_version)
1871 return m is None or int(m.group(1)) >= 39
1872
1873
1874@unittest.skipUnless(supports_extended_attributes(),
1875 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04001876class ExtendedAttributeTests(unittest.TestCase):
1877
1878 def tearDown(self):
1879 support.unlink(support.TESTFN)
1880
Larry Hastings9cf065c2012-06-22 16:30:09 -07001881 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04001882 fn = support.TESTFN
1883 open(fn, "wb").close()
1884 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001885 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001886 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001887 init_xattr = listxattr(fn)
1888 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001889 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001890 xattr = set(init_xattr)
1891 xattr.add("user.test")
1892 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001893 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
1894 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
1895 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04001896 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001897 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001898 self.assertEqual(cm.exception.errno, errno.EEXIST)
1899 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001900 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001901 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001902 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001903 xattr.add("user.test2")
1904 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001905 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001906 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001907 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001908 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001909 xattr.remove("user.test")
1910 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001911 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
1912 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
1913 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
1914 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001915 many = sorted("user.test{}".format(i) for i in range(100))
1916 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001917 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001918 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001919
Larry Hastings9cf065c2012-06-22 16:30:09 -07001920 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04001921 def make_bytes(s):
1922 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07001923 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001924 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001925 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001926
1927 def test_simple(self):
1928 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1929 os.listxattr)
1930
1931 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07001932 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1933 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001934
1935 def test_fds(self):
1936 def getxattr(path, *args):
1937 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001938 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001939 def setxattr(path, *args):
1940 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001941 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001942 def removexattr(path, *args):
1943 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001944 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001945 def listxattr(path, *args):
1946 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001947 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001948 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1949
1950
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001951@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1952class Win32DeprecatedBytesAPI(unittest.TestCase):
1953 def test_deprecated(self):
1954 import nt
1955 filename = os.fsencode(support.TESTFN)
1956 with warnings.catch_warnings():
1957 warnings.simplefilter("error", DeprecationWarning)
1958 for func, *args in (
1959 (nt._getfullpathname, filename),
1960 (nt._isdir, filename),
1961 (os.access, filename, os.R_OK),
1962 (os.chdir, filename),
1963 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001964 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001965 (os.link, filename, filename),
1966 (os.listdir, filename),
1967 (os.lstat, filename),
1968 (os.mkdir, filename),
1969 (os.open, filename, os.O_RDONLY),
1970 (os.rename, filename, filename),
1971 (os.rmdir, filename),
1972 (os.startfile, filename),
1973 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001974 (os.unlink, filename),
1975 (os.utime, filename),
1976 ):
1977 self.assertRaises(DeprecationWarning, func, *args)
1978
Victor Stinner28216442011-11-16 00:34:44 +01001979 @support.skip_unless_symlink
1980 def test_symlink(self):
1981 filename = os.fsencode(support.TESTFN)
1982 with warnings.catch_warnings():
1983 warnings.simplefilter("error", DeprecationWarning)
1984 self.assertRaises(DeprecationWarning,
1985 os.symlink, filename, filename)
1986
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001987
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001988@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
1989class TermsizeTests(unittest.TestCase):
1990 def test_does_not_crash(self):
1991 """Check if get_terminal_size() returns a meaningful value.
1992
1993 There's no easy portable way to actually check the size of the
1994 terminal, so let's check if it returns something sensible instead.
1995 """
1996 try:
1997 size = os.get_terminal_size()
1998 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001999 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002000 # Under win32 a generic OSError can be thrown if the
2001 # handle cannot be retrieved
2002 self.skipTest("failed to query terminal size")
2003 raise
2004
Antoine Pitroucfade362012-02-08 23:48:59 +01002005 self.assertGreaterEqual(size.columns, 0)
2006 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002007
2008 def test_stty_match(self):
2009 """Check if stty returns the same results
2010
2011 stty actually tests stdin, so get_terminal_size is invoked on
2012 stdin explicitly. If stty succeeded, then get_terminal_size()
2013 should work too.
2014 """
2015 try:
2016 size = subprocess.check_output(['stty', 'size']).decode().split()
2017 except (FileNotFoundError, subprocess.CalledProcessError):
2018 self.skipTest("stty invocation failed")
2019 expected = (int(size[1]), int(size[0])) # reversed order
2020
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002021 try:
2022 actual = os.get_terminal_size(sys.__stdin__.fileno())
2023 except OSError as e:
2024 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2025 # Under win32 a generic OSError can be thrown if the
2026 # handle cannot be retrieved
2027 self.skipTest("failed to query terminal size")
2028 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002029 self.assertEqual(expected, actual)
2030
2031
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002032@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002033def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002034 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002035 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002036 StatAttributeTests,
2037 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002038 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002039 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002040 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002041 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002042 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002043 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002044 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002045 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002046 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002047 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002048 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002049 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002050 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002051 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002052 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002053 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002054 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002055 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002056 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002057 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002058 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002059 TermsizeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002060 )
Fred Drake2e2be372001-09-20 21:33:42 +00002061
2062if __name__ == "__main__":
2063 test_main()