blob: a3b99a3366e182a0cd5474e4173eb69210155a31 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
23try:
24 import threading
25except ImportError:
26 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000027
Mark Dickinson7cf03892010-04-16 13:45:35 +000028# Detect whether we're on a Linux system that uses the (now outdated
29# and unmaintained) linuxthreads threading library. There's an issue
30# when combining linuxthreads with a failed execv call: see
31# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020032if hasattr(sys, 'thread_info') and sys.thread_info.version:
33 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
34else:
35 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000036
Thomas Wouters0e3f5912006-08-11 14:57:12 +000037# Tests creating TESTFN
38class FileTests(unittest.TestCase):
39 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040 if os.path.exists(support.TESTFN):
41 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000042 tearDown = setUp
43
44 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000045 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000047 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048
Christian Heimesfdab48e2008-01-20 09:06:41 +000049 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000050 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
51 # We must allocate two consecutive file descriptors, otherwise
52 # it will mess up other file descriptors (perhaps even the three
53 # standard ones).
54 second = os.dup(first)
55 try:
56 retries = 0
57 while second != first + 1:
58 os.close(first)
59 retries += 1
60 if retries > 10:
61 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000062 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first, second = second, os.dup(second)
64 finally:
65 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000066 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000067 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000068 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000070 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000071 def test_rename(self):
72 path = support.TESTFN
73 old = sys.getrefcount(path)
74 self.assertRaises(TypeError, os.rename, path, 0)
75 new = sys.getrefcount(path)
76 self.assertEqual(old, new)
77
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000078 def test_read(self):
79 with open(support.TESTFN, "w+b") as fobj:
80 fobj.write(b"spam")
81 fobj.flush()
82 fd = fobj.fileno()
83 os.lseek(fd, 0, 0)
84 s = os.read(fd, 4)
85 self.assertEqual(type(s), bytes)
86 self.assertEqual(s, b"spam")
87
88 def test_write(self):
89 # os.write() accepts bytes- and buffer-like objects but not strings
90 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
91 self.assertRaises(TypeError, os.write, fd, "beans")
92 os.write(fd, b"bacon\n")
93 os.write(fd, bytearray(b"eggs\n"))
94 os.write(fd, memoryview(b"spam\n"))
95 os.close(fd)
96 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000097 self.assertEqual(fobj.read().splitlines(),
98 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099
Victor Stinnere0daff12011-03-20 23:36:35 +0100100 def write_windows_console(self, *args):
101 retcode = subprocess.call(args,
102 # use a new console to not flood the test output
103 creationflags=subprocess.CREATE_NEW_CONSOLE,
104 # use a shell to hide the console window (SW_HIDE)
105 shell=True)
106 self.assertEqual(retcode, 0)
107
108 @unittest.skipUnless(sys.platform == 'win32',
109 'test specific to the Windows console')
110 def test_write_windows_console(self):
111 # Issue #11395: the Windows console returns an error (12: not enough
112 # space error) on writing into stdout if stdout mode is binary and the
113 # length is greater than 66,000 bytes (or less, depending on heap
114 # usage).
115 code = "print('x' * 100000)"
116 self.write_windows_console(sys.executable, "-c", code)
117 self.write_windows_console(sys.executable, "-u", "-c", code)
118
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000119 def fdopen_helper(self, *args):
120 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200121 f = os.fdopen(fd, *args)
122 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000123
124 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200125 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
126 os.close(fd)
127
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000128 self.fdopen_helper()
129 self.fdopen_helper('r')
130 self.fdopen_helper('r', 100)
131
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200132
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000133# Test attributes on return values from os.*stat* family.
134class StatAttributeTests(unittest.TestCase):
135 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000136 os.mkdir(support.TESTFN)
137 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000138 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000139 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000140 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000141
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000142 def tearDown(self):
143 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000144 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000145
Antoine Pitrou38425292010-09-21 18:19:07 +0000146 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000147 if not hasattr(os, "stat"):
148 return
149
150 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000151 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000152
153 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000154 self.assertEqual(result[stat.ST_SIZE], 3)
155 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000156
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000157 # Make sure all the attributes are there
158 members = dir(result)
159 for name in dir(stat):
160 if name[:3] == 'ST_':
161 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000162 if name.endswith("TIME"):
163 def trunc(x): return int(x)
164 else:
165 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000166 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000168 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169
170 try:
171 result[200]
172 self.fail("No exception thrown")
173 except IndexError:
174 pass
175
176 # Make sure that assignment fails
177 try:
178 result.st_mode = 1
179 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000180 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 pass
182
183 try:
184 result.st_rdev = 1
185 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000186 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000187 pass
188
189 try:
190 result.parrot = 1
191 self.fail("No exception thrown")
192 except AttributeError:
193 pass
194
195 # Use the stat_result constructor with a too-short tuple.
196 try:
197 result2 = os.stat_result((10,))
198 self.fail("No exception thrown")
199 except TypeError:
200 pass
201
Ezio Melotti42da6632011-03-15 05:18:48 +0200202 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203 try:
204 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
205 except TypeError:
206 pass
207
Antoine Pitrou38425292010-09-21 18:19:07 +0000208 def test_stat_attributes(self):
209 self.check_stat_attributes(self.fname)
210
211 def test_stat_attributes_bytes(self):
212 try:
213 fname = self.fname.encode(sys.getfilesystemencoding())
214 except UnicodeEncodeError:
215 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100216 with warnings.catch_warnings():
217 warnings.simplefilter("ignore", DeprecationWarning)
218 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000219
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 def test_statvfs_attributes(self):
221 if not hasattr(os, "statvfs"):
222 return
223
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000224 try:
225 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000226 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000227 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000228 if e.errno == errno.ENOSYS:
229 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000230
231 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000232 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000234 # Make sure all the attributes are there.
235 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
236 'ffree', 'favail', 'flag', 'namemax')
237 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000238 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000239
240 # Make sure that assignment really fails
241 try:
242 result.f_bfree = 1
243 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000244 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000245 pass
246
247 try:
248 result.parrot = 1
249 self.fail("No exception thrown")
250 except AttributeError:
251 pass
252
253 # Use the constructor with a too-short tuple.
254 try:
255 result2 = os.statvfs_result((10,))
256 self.fail("No exception thrown")
257 except TypeError:
258 pass
259
Ezio Melotti42da6632011-03-15 05:18:48 +0200260 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261 try:
262 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
263 except TypeError:
264 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000265
Thomas Wouters89f507f2006-12-13 04:49:30 +0000266 def test_utime_dir(self):
267 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000268 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000269 # round to int, because some systems may support sub-second
270 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
272 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000273 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000274
Brian Curtin52fbea12011-11-06 13:41:17 -0600275 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600276 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600277 # second argument. Check that the previous methods of passing
278 # a time tuple or None work in addition to no argument.
279 st = os.stat(support.TESTFN)
280 # Doesn't set anything new, but sets the time tuple way
281 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
282 # Set to the current time in the old explicit way.
283 os.utime(support.TESTFN, None)
284 st1 = os.stat(support.TESTFN)
285 # Set to the current time in the new way
286 os.utime(support.TESTFN)
287 st2 = os.stat(support.TESTFN)
288 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
289
Thomas Wouters89f507f2006-12-13 04:49:30 +0000290 # Restrict test to Win32, since there is no guarantee other
291 # systems support centiseconds
292 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000293 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000294 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000295 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000296 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000297 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000298 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000299 return buf.value
300
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000301 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000302 def test_1565150(self):
303 t1 = 1159195039.25
304 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000305 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000306
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000307 def test_large_time(self):
308 t1 = 5000000000 # some day in 2128
309 os.utime(self.fname, (t1, t1))
310 self.assertEqual(os.stat(self.fname).st_mtime, t1)
311
Guido van Rossumd8faa362007-04-27 19:54:29 +0000312 def test_1686475(self):
313 # Verify that an open file can be stat'ed
314 try:
315 os.stat(r"c:\pagefile.sys")
316 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000317 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000318 return
319 self.fail("Could not stat pagefile.sys")
320
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000321from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000322
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000323class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000324 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000325 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000326
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000327 def setUp(self):
328 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000329 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000330 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000331 for key, value in self._reference().items():
332 os.environ[key] = value
333
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000334 def tearDown(self):
335 os.environ.clear()
336 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000337 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000338 os.environb.clear()
339 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000340
Christian Heimes90333392007-11-01 19:08:42 +0000341 def _reference(self):
342 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
343
344 def _empty_mapping(self):
345 os.environ.clear()
346 return os.environ
347
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000348 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000349 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000350 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000351 if os.path.exists("/bin/sh"):
352 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000353 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
354 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000355 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000356
Christian Heimes1a13d592007-11-08 14:16:55 +0000357 def test_os_popen_iter(self):
358 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000359 with os.popen(
360 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
361 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000362 self.assertEqual(next(it), "line1\n")
363 self.assertEqual(next(it), "line2\n")
364 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000365 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000366
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000367 # Verify environ keys and values from the OS are of the
368 # correct str type.
369 def test_keyvalue_types(self):
370 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000371 self.assertEqual(type(key), str)
372 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000373
Christian Heimes90333392007-11-01 19:08:42 +0000374 def test_items(self):
375 for key, value in self._reference().items():
376 self.assertEqual(os.environ.get(key), value)
377
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000378 # Issue 7310
379 def test___repr__(self):
380 """Check that the repr() of os.environ looks like environ({...})."""
381 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000382 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
383 '{!r}: {!r}'.format(key, value)
384 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000385
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000386 def test_get_exec_path(self):
387 defpath_list = os.defpath.split(os.pathsep)
388 test_path = ['/monty', '/python', '', '/flying/circus']
389 test_env = {'PATH': os.pathsep.join(test_path)}
390
391 saved_environ = os.environ
392 try:
393 os.environ = dict(test_env)
394 # Test that defaulting to os.environ works.
395 self.assertSequenceEqual(test_path, os.get_exec_path())
396 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
397 finally:
398 os.environ = saved_environ
399
400 # No PATH environment variable
401 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
402 # Empty PATH environment variable
403 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
404 # Supplied PATH environment variable
405 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
406
Victor Stinnerb745a742010-05-18 17:17:23 +0000407 if os.supports_bytes_environ:
408 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000409 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000410 # ignore BytesWarning warning
411 with warnings.catch_warnings(record=True):
412 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000413 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000414 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000415 pass
416 else:
417 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000418
419 # bytes key and/or value
420 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
421 ['abc'])
422 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
423 ['abc'])
424 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
425 ['abc'])
426
427 @unittest.skipUnless(os.supports_bytes_environ,
428 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000429 def test_environb(self):
430 # os.environ -> os.environb
431 value = 'euro\u20ac'
432 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000433 value_bytes = value.encode(sys.getfilesystemencoding(),
434 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000435 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000436 msg = "U+20AC character is not encodable to %s" % (
437 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000438 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000439 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000440 self.assertEqual(os.environ['unicode'], value)
441 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000442
443 # os.environb -> os.environ
444 value = b'\xff'
445 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000446 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000447 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000448 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000449
Tim Petersc4e09402003-04-25 07:11:48 +0000450class WalkTests(unittest.TestCase):
451 """Tests for os.walk()."""
452
453 def test_traversal(self):
454 import os
455 from os.path import join
456
457 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000458 # TESTFN/
459 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000460 # tmp1
461 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000462 # tmp2
463 # SUB11/ no kids
464 # SUB2/ a file kid and a dirsymlink kid
465 # tmp3
466 # link/ a symlink to TESTFN.2
467 # TEST2/
468 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000469 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000470 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000471 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000472 sub2_path = join(walk_path, "SUB2")
473 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000474 tmp2_path = join(sub1_path, "tmp2")
475 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000476 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000477 t2_path = join(support.TESTFN, "TEST2")
478 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000479
480 # Create stuff.
481 os.makedirs(sub11_path)
482 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000483 os.makedirs(t2_path)
484 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000485 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000486 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
487 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000488 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000489 os.symlink(os.path.abspath(t2_path), link_path)
490 sub2_tree = (sub2_path, ["link"], ["tmp3"])
491 else:
492 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000493
494 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000495 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000496 self.assertEqual(len(all), 4)
497 # We can't know which order SUB1 and SUB2 will appear in.
498 # Not flipped: TESTFN, SUB1, SUB11, SUB2
499 # flipped: TESTFN, SUB2, SUB1, SUB11
500 flipped = all[0][1][0] != "SUB1"
501 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000502 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000503 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
504 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000505 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000506
507 # Prune the search.
508 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000509 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000510 all.append((root, dirs, files))
511 # Don't descend into SUB1.
512 if 'SUB1' in dirs:
513 # Note that this also mutates the dirs we appended to all!
514 dirs.remove('SUB1')
515 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000516 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
517 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000518
519 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000520 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000521 self.assertEqual(len(all), 4)
522 # We can't know which order SUB1 and SUB2 will appear in.
523 # Not flipped: SUB11, SUB1, SUB2, TESTFN
524 # flipped: SUB2, SUB11, SUB1, TESTFN
525 flipped = all[3][1][0] != "SUB1"
526 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000527 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000528 self.assertEqual(all[flipped], (sub11_path, [], []))
529 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000530 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000531
Brian Curtin3b4499c2010-12-28 14:31:47 +0000532 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000533 # Walk, following symlinks.
534 for root, dirs, files in os.walk(walk_path, followlinks=True):
535 if root == link_path:
536 self.assertEqual(dirs, [])
537 self.assertEqual(files, ["tmp4"])
538 break
539 else:
540 self.fail("Didn't follow symlink with followlinks=True")
541
542 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000543 # Tear everything down. This is a decent use for bottom-up on
544 # Windows, which doesn't have a recursive delete command. The
545 # (not so) subtlety is that rmdir will fail unless the dir's
546 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000547 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000548 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000549 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000550 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000551 dirname = os.path.join(root, name)
552 if not os.path.islink(dirname):
553 os.rmdir(dirname)
554 else:
555 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000556 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000557
Guido van Rossume7ba4952007-06-06 23:52:48 +0000558class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000559 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000560 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000561
562 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000563 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000564 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
565 os.makedirs(path) # Should work
566 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
567 os.makedirs(path)
568
569 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000570 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000571 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
572 os.makedirs(path)
573 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
574 'dir5', 'dir6')
575 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000576
Terry Reedy5a22b652010-12-02 07:05:56 +0000577 def test_exist_ok_existing_directory(self):
578 path = os.path.join(support.TESTFN, 'dir1')
579 mode = 0o777
580 old_mask = os.umask(0o022)
581 os.makedirs(path, mode)
582 self.assertRaises(OSError, os.makedirs, path, mode)
583 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
584 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
585 os.makedirs(path, mode=mode, exist_ok=True)
586 os.umask(old_mask)
587
588 def test_exist_ok_existing_regular_file(self):
589 base = support.TESTFN
590 path = os.path.join(support.TESTFN, 'dir1')
591 f = open(path, 'w')
592 f.write('abc')
593 f.close()
594 self.assertRaises(OSError, os.makedirs, path)
595 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
596 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
597 os.remove(path)
598
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000599 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000600 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000601 'dir4', 'dir5', 'dir6')
602 # If the tests failed, the bottom-most directory ('../dir6')
603 # may not have been created, so we look for the outermost directory
604 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000605 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000606 path = os.path.dirname(path)
607
608 os.removedirs(path)
609
Guido van Rossume7ba4952007-06-06 23:52:48 +0000610class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000611 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200612 with open(os.devnull, 'wb') as f:
613 f.write(b'hello')
614 f.close()
615 with open(os.devnull, 'rb') as f:
616 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000617
Guido van Rossume7ba4952007-06-06 23:52:48 +0000618class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000619 def test_urandom(self):
620 try:
621 self.assertEqual(len(os.urandom(1)), 1)
622 self.assertEqual(len(os.urandom(10)), 10)
623 self.assertEqual(len(os.urandom(100)), 100)
624 self.assertEqual(len(os.urandom(1000)), 1000)
625 except NotImplementedError:
626 pass
627
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000628@contextlib.contextmanager
629def _execvpe_mockup(defpath=None):
630 """
631 Stubs out execv and execve functions when used as context manager.
632 Records exec calls. The mock execv and execve functions always raise an
633 exception as they would normally never return.
634 """
635 # A list of tuples containing (function name, first arg, args)
636 # of calls to execv or execve that have been made.
637 calls = []
638
639 def mock_execv(name, *args):
640 calls.append(('execv', name, args))
641 raise RuntimeError("execv called")
642
643 def mock_execve(name, *args):
644 calls.append(('execve', name, args))
645 raise OSError(errno.ENOTDIR, "execve called")
646
647 try:
648 orig_execv = os.execv
649 orig_execve = os.execve
650 orig_defpath = os.defpath
651 os.execv = mock_execv
652 os.execve = mock_execve
653 if defpath is not None:
654 os.defpath = defpath
655 yield calls
656 finally:
657 os.execv = orig_execv
658 os.execve = orig_execve
659 os.defpath = orig_defpath
660
Guido van Rossume7ba4952007-06-06 23:52:48 +0000661class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000662 @unittest.skipIf(USING_LINUXTHREADS,
663 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000664 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000665 self.assertRaises(OSError, os.execvpe, 'no such app-',
666 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000667
Thomas Heller6790d602007-08-30 17:15:14 +0000668 def test_execvpe_with_bad_arglist(self):
669 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
670
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000671 @unittest.skipUnless(hasattr(os, '_execvpe'),
672 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000673 def _test_internal_execvpe(self, test_type):
674 program_path = os.sep + 'absolutepath'
675 if test_type is bytes:
676 program = b'executable'
677 fullpath = os.path.join(os.fsencode(program_path), program)
678 native_fullpath = fullpath
679 arguments = [b'progname', 'arg1', 'arg2']
680 else:
681 program = 'executable'
682 arguments = ['progname', 'arg1', 'arg2']
683 fullpath = os.path.join(program_path, program)
684 if os.name != "nt":
685 native_fullpath = os.fsencode(fullpath)
686 else:
687 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000688 env = {'spam': 'beans'}
689
Victor Stinnerb745a742010-05-18 17:17:23 +0000690 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000691 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000692 self.assertRaises(RuntimeError,
693 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000694 self.assertEqual(len(calls), 1)
695 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
696
Victor Stinnerb745a742010-05-18 17:17:23 +0000697 # test os._execvpe() with a relative path:
698 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000699 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000700 self.assertRaises(OSError,
701 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000702 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000703 self.assertSequenceEqual(calls[0],
704 ('execve', native_fullpath, (arguments, env)))
705
706 # test os._execvpe() with a relative path:
707 # os.get_exec_path() reads the 'PATH' variable
708 with _execvpe_mockup() as calls:
709 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000710 if test_type is bytes:
711 env_path[b'PATH'] = program_path
712 else:
713 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000714 self.assertRaises(OSError,
715 os._execvpe, program, arguments, env=env_path)
716 self.assertEqual(len(calls), 1)
717 self.assertSequenceEqual(calls[0],
718 ('execve', native_fullpath, (arguments, env_path)))
719
720 def test_internal_execvpe_str(self):
721 self._test_internal_execvpe(str)
722 if os.name != "nt":
723 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000724
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000725
Thomas Wouters477c8d52006-05-27 19:21:47 +0000726class Win32ErrorTests(unittest.TestCase):
727 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000728 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000729
730 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000731 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000732
733 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000734 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000735
736 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000737 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000738 try:
739 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
740 finally:
741 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000742 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000743
744 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000745 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000746
Thomas Wouters477c8d52006-05-27 19:21:47 +0000747 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000748 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000749
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000750class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000751 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000752 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
753 #singles.append("close")
754 #We omit close because it doesn'r raise an exception on some platforms
755 def get_single(f):
756 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000757 if hasattr(os, f):
758 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000759 return helper
760 for f in singles:
761 locals()["test_"+f] = get_single(f)
762
Benjamin Peterson7522c742009-01-19 21:00:09 +0000763 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000764 try:
765 f(support.make_bad_fd(), *args)
766 except OSError as e:
767 self.assertEqual(e.errno, errno.EBADF)
768 else:
769 self.fail("%r didn't raise a OSError with a bad file descriptor"
770 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000771
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000772 def test_isatty(self):
773 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000774 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000775
776 def test_closerange(self):
777 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000778 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000779 # Make sure none of the descriptors we are about to close are
780 # currently valid (issue 6542).
781 for i in range(10):
782 try: os.fstat(fd+i)
783 except OSError:
784 pass
785 else:
786 break
787 if i < 2:
788 raise unittest.SkipTest(
789 "Unable to acquire a range of invalid file descriptors")
790 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000791
792 def test_dup2(self):
793 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000794 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000795
796 def test_fchmod(self):
797 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000798 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000799
800 def test_fchown(self):
801 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000802 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000803
804 def test_fpathconf(self):
805 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000806 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000807
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000808 def test_ftruncate(self):
809 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000810 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000811
812 def test_lseek(self):
813 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000814 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000815
816 def test_read(self):
817 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000818 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000819
820 def test_tcsetpgrpt(self):
821 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000822 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000823
824 def test_write(self):
825 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000826 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000827
Brian Curtin1b9df392010-11-24 20:24:31 +0000828
829class LinkTests(unittest.TestCase):
830 def setUp(self):
831 self.file1 = support.TESTFN
832 self.file2 = os.path.join(support.TESTFN + "2")
833
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000834 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000835 for file in (self.file1, self.file2):
836 if os.path.exists(file):
837 os.unlink(file)
838
Brian Curtin1b9df392010-11-24 20:24:31 +0000839 def _test_link(self, file1, file2):
840 with open(file1, "w") as f1:
841 f1.write("test")
842
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100843 with warnings.catch_warnings():
844 warnings.simplefilter("ignore", DeprecationWarning)
845 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +0000846 with open(file1, "r") as f1, open(file2, "r") as f2:
847 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
848
849 def test_link(self):
850 self._test_link(self.file1, self.file2)
851
852 def test_link_bytes(self):
853 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
854 bytes(self.file2, sys.getfilesystemencoding()))
855
Brian Curtinf498b752010-11-30 15:54:04 +0000856 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000857 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000858 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000859 except UnicodeError:
860 raise unittest.SkipTest("Unable to encode for this platform.")
861
Brian Curtinf498b752010-11-30 15:54:04 +0000862 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000863 self.file2 = self.file1 + "2"
864 self._test_link(self.file1, self.file2)
865
Thomas Wouters477c8d52006-05-27 19:21:47 +0000866if sys.platform != 'win32':
867 class Win32ErrorTests(unittest.TestCase):
868 pass
869
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000870 class PosixUidGidTests(unittest.TestCase):
871 if hasattr(os, 'setuid'):
872 def test_setuid(self):
873 if os.getuid() != 0:
874 self.assertRaises(os.error, os.setuid, 0)
875 self.assertRaises(OverflowError, os.setuid, 1<<32)
876
877 if hasattr(os, 'setgid'):
878 def test_setgid(self):
879 if os.getuid() != 0:
880 self.assertRaises(os.error, os.setgid, 0)
881 self.assertRaises(OverflowError, os.setgid, 1<<32)
882
883 if hasattr(os, 'seteuid'):
884 def test_seteuid(self):
885 if os.getuid() != 0:
886 self.assertRaises(os.error, os.seteuid, 0)
887 self.assertRaises(OverflowError, os.seteuid, 1<<32)
888
889 if hasattr(os, 'setegid'):
890 def test_setegid(self):
891 if os.getuid() != 0:
892 self.assertRaises(os.error, os.setegid, 0)
893 self.assertRaises(OverflowError, os.setegid, 1<<32)
894
895 if hasattr(os, 'setreuid'):
896 def test_setreuid(self):
897 if os.getuid() != 0:
898 self.assertRaises(os.error, os.setreuid, 0, 0)
899 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
900 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000901
902 def test_setreuid_neg1(self):
903 # Needs to accept -1. We run this in a subprocess to avoid
904 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000905 subprocess.check_call([
906 sys.executable, '-c',
907 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000908
909 if hasattr(os, 'setregid'):
910 def test_setregid(self):
911 if os.getuid() != 0:
912 self.assertRaises(os.error, os.setregid, 0, 0)
913 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
914 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000915
916 def test_setregid_neg1(self):
917 # Needs to accept -1. We run this in a subprocess to avoid
918 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000919 subprocess.check_call([
920 sys.executable, '-c',
921 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +0000922
923 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +0000924 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000925 if support.TESTFN_UNENCODABLE:
926 self.dir = support.TESTFN_UNENCODABLE
927 else:
928 self.dir = support.TESTFN
929 self.bdir = os.fsencode(self.dir)
930
931 bytesfn = []
932 def add_filename(fn):
933 try:
934 fn = os.fsencode(fn)
935 except UnicodeEncodeError:
936 return
937 bytesfn.append(fn)
938 add_filename(support.TESTFN_UNICODE)
939 if support.TESTFN_UNENCODABLE:
940 add_filename(support.TESTFN_UNENCODABLE)
941 if not bytesfn:
942 self.skipTest("couldn't create any non-ascii filename")
943
944 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +0000945 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000946 try:
947 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +0200948 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +0000949 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000950 if fn in self.unicodefn:
951 raise ValueError("duplicate filename")
952 self.unicodefn.add(fn)
953 except:
954 shutil.rmtree(self.dir)
955 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +0000956
957 def tearDown(self):
958 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000959
960 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000961 expected = self.unicodefn
962 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000963 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000964
965 def test_open(self):
966 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +0200967 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +0000968 f.close()
969
970 def test_stat(self):
971 for fn in self.unicodefn:
972 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000973else:
974 class PosixUidGidTests(unittest.TestCase):
975 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +0000976 class Pep383Tests(unittest.TestCase):
977 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000978
Brian Curtineb24d742010-04-12 17:16:38 +0000979@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
980class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +0000981 def _kill(self, sig):
982 # Start sys.executable as a subprocess and communicate from the
983 # subprocess to the parent that the interpreter is ready. When it
984 # becomes ready, send *sig* via os.kill to the subprocess and check
985 # that the return code is equal to *sig*.
986 import ctypes
987 from ctypes import wintypes
988 import msvcrt
989
990 # Since we can't access the contents of the process' stdout until the
991 # process has exited, use PeekNamedPipe to see what's inside stdout
992 # without waiting. This is done so we can tell that the interpreter
993 # is started and running at a point where it could handle a signal.
994 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
995 PeekNamedPipe.restype = wintypes.BOOL
996 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
997 ctypes.POINTER(ctypes.c_char), # stdout buf
998 wintypes.DWORD, # Buffer size
999 ctypes.POINTER(wintypes.DWORD), # bytes read
1000 ctypes.POINTER(wintypes.DWORD), # bytes avail
1001 ctypes.POINTER(wintypes.DWORD)) # bytes left
1002 msg = "running"
1003 proc = subprocess.Popen([sys.executable, "-c",
1004 "import sys;"
1005 "sys.stdout.write('{}');"
1006 "sys.stdout.flush();"
1007 "input()".format(msg)],
1008 stdout=subprocess.PIPE,
1009 stderr=subprocess.PIPE,
1010 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001011 self.addCleanup(proc.stdout.close)
1012 self.addCleanup(proc.stderr.close)
1013 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001014
1015 count, max = 0, 100
1016 while count < max and proc.poll() is None:
1017 # Create a string buffer to store the result of stdout from the pipe
1018 buf = ctypes.create_string_buffer(len(msg))
1019 # Obtain the text currently in proc.stdout
1020 # Bytes read/avail/left are left as NULL and unused
1021 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1022 buf, ctypes.sizeof(buf), None, None, None)
1023 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1024 if buf.value:
1025 self.assertEqual(msg, buf.value.decode())
1026 break
1027 time.sleep(0.1)
1028 count += 1
1029 else:
1030 self.fail("Did not receive communication from the subprocess")
1031
Brian Curtineb24d742010-04-12 17:16:38 +00001032 os.kill(proc.pid, sig)
1033 self.assertEqual(proc.wait(), sig)
1034
1035 def test_kill_sigterm(self):
1036 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001037 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001038
1039 def test_kill_int(self):
1040 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001041 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001042
1043 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001044 tagname = "test_os_%s" % uuid.uuid1()
1045 m = mmap.mmap(-1, 1, tagname)
1046 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001047 # Run a script which has console control handling enabled.
1048 proc = subprocess.Popen([sys.executable,
1049 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001050 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001051 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1052 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001053 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001054 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001055 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001056 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001057 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001058 count += 1
1059 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001060 # Forcefully kill the process if we weren't able to signal it.
1061 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001062 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001063 os.kill(proc.pid, event)
1064 # proc.send_signal(event) could also be done here.
1065 # Allow time for the signal to be passed and the process to exit.
1066 time.sleep(0.5)
1067 if not proc.poll():
1068 # Forcefully kill the process if we weren't able to signal it.
1069 os.kill(proc.pid, signal.SIGINT)
1070 self.fail("subprocess did not stop on {}".format(name))
1071
1072 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1073 def test_CTRL_C_EVENT(self):
1074 from ctypes import wintypes
1075 import ctypes
1076
1077 # Make a NULL value by creating a pointer with no argument.
1078 NULL = ctypes.POINTER(ctypes.c_int)()
1079 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1080 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1081 wintypes.BOOL)
1082 SetConsoleCtrlHandler.restype = wintypes.BOOL
1083
1084 # Calling this with NULL and FALSE causes the calling process to
1085 # handle CTRL+C, rather than ignore it. This property is inherited
1086 # by subprocesses.
1087 SetConsoleCtrlHandler(NULL, 0)
1088
1089 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1090
1091 def test_CTRL_BREAK_EVENT(self):
1092 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1093
1094
Brian Curtind40e6f72010-07-08 21:39:08 +00001095@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001096@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001097class Win32SymlinkTests(unittest.TestCase):
1098 filelink = 'filelinktest'
1099 filelink_target = os.path.abspath(__file__)
1100 dirlink = 'dirlinktest'
1101 dirlink_target = os.path.dirname(filelink_target)
1102 missing_link = 'missing link'
1103
1104 def setUp(self):
1105 assert os.path.exists(self.dirlink_target)
1106 assert os.path.exists(self.filelink_target)
1107 assert not os.path.exists(self.dirlink)
1108 assert not os.path.exists(self.filelink)
1109 assert not os.path.exists(self.missing_link)
1110
1111 def tearDown(self):
1112 if os.path.exists(self.filelink):
1113 os.remove(self.filelink)
1114 if os.path.exists(self.dirlink):
1115 os.rmdir(self.dirlink)
1116 if os.path.lexists(self.missing_link):
1117 os.remove(self.missing_link)
1118
1119 def test_directory_link(self):
1120 os.symlink(self.dirlink_target, self.dirlink)
1121 self.assertTrue(os.path.exists(self.dirlink))
1122 self.assertTrue(os.path.isdir(self.dirlink))
1123 self.assertTrue(os.path.islink(self.dirlink))
1124 self.check_stat(self.dirlink, self.dirlink_target)
1125
1126 def test_file_link(self):
1127 os.symlink(self.filelink_target, self.filelink)
1128 self.assertTrue(os.path.exists(self.filelink))
1129 self.assertTrue(os.path.isfile(self.filelink))
1130 self.assertTrue(os.path.islink(self.filelink))
1131 self.check_stat(self.filelink, self.filelink_target)
1132
1133 def _create_missing_dir_link(self):
1134 'Create a "directory" link to a non-existent target'
1135 linkname = self.missing_link
1136 if os.path.lexists(linkname):
1137 os.remove(linkname)
1138 target = r'c:\\target does not exist.29r3c740'
1139 assert not os.path.exists(target)
1140 target_is_dir = True
1141 os.symlink(target, linkname, target_is_dir)
1142
1143 def test_remove_directory_link_to_missing_target(self):
1144 self._create_missing_dir_link()
1145 # For compatibility with Unix, os.remove will check the
1146 # directory status and call RemoveDirectory if the symlink
1147 # was created with target_is_dir==True.
1148 os.remove(self.missing_link)
1149
1150 @unittest.skip("currently fails; consider for improvement")
1151 def test_isdir_on_directory_link_to_missing_target(self):
1152 self._create_missing_dir_link()
1153 # consider having isdir return true for directory links
1154 self.assertTrue(os.path.isdir(self.missing_link))
1155
1156 @unittest.skip("currently fails; consider for improvement")
1157 def test_rmdir_on_directory_link_to_missing_target(self):
1158 self._create_missing_dir_link()
1159 # consider allowing rmdir to remove directory links
1160 os.rmdir(self.missing_link)
1161
1162 def check_stat(self, link, target):
1163 self.assertEqual(os.stat(link), os.stat(target))
1164 self.assertNotEqual(os.lstat(link), os.stat(link))
1165
Brian Curtind25aef52011-06-13 15:16:04 -05001166 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001167 with warnings.catch_warnings():
1168 warnings.simplefilter("ignore", DeprecationWarning)
1169 self.assertEqual(os.stat(bytes_link), os.stat(target))
1170 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001171
1172 def test_12084(self):
1173 level1 = os.path.abspath(support.TESTFN)
1174 level2 = os.path.join(level1, "level2")
1175 level3 = os.path.join(level2, "level3")
1176 try:
1177 os.mkdir(level1)
1178 os.mkdir(level2)
1179 os.mkdir(level3)
1180
1181 file1 = os.path.abspath(os.path.join(level1, "file1"))
1182
1183 with open(file1, "w") as f:
1184 f.write("file1")
1185
1186 orig_dir = os.getcwd()
1187 try:
1188 os.chdir(level2)
1189 link = os.path.join(level2, "link")
1190 os.symlink(os.path.relpath(file1), "link")
1191 self.assertIn("link", os.listdir(os.getcwd()))
1192
1193 # Check os.stat calls from the same dir as the link
1194 self.assertEqual(os.stat(file1), os.stat("link"))
1195
1196 # Check os.stat calls from a dir below the link
1197 os.chdir(level1)
1198 self.assertEqual(os.stat(file1),
1199 os.stat(os.path.relpath(link)))
1200
1201 # Check os.stat calls from a dir above the link
1202 os.chdir(level3)
1203 self.assertEqual(os.stat(file1),
1204 os.stat(os.path.relpath(link)))
1205 finally:
1206 os.chdir(orig_dir)
1207 except OSError as err:
1208 self.fail(err)
1209 finally:
1210 os.remove(file1)
1211 shutil.rmtree(level1)
1212
Brian Curtind40e6f72010-07-08 21:39:08 +00001213
Victor Stinnere8d51452010-08-19 01:05:19 +00001214class FSEncodingTests(unittest.TestCase):
1215 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001216 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1217 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001218
Victor Stinnere8d51452010-08-19 01:05:19 +00001219 def test_identity(self):
1220 # assert fsdecode(fsencode(x)) == x
1221 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1222 try:
1223 bytesfn = os.fsencode(fn)
1224 except UnicodeEncodeError:
1225 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001226 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001227
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001228
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001229class PidTests(unittest.TestCase):
1230 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1231 def test_getppid(self):
1232 p = subprocess.Popen([sys.executable, '-c',
1233 'import os; print(os.getppid())'],
1234 stdout=subprocess.PIPE)
1235 stdout, _ = p.communicate()
1236 # We are the parent of our subprocess
1237 self.assertEqual(int(stdout), os.getpid())
1238
1239
Brian Curtin0151b8e2010-09-24 13:43:43 +00001240# The introduction of this TestCase caused at least two different errors on
1241# *nix buildbots. Temporarily skip this to let the buildbots move along.
1242@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001243@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1244class LoginTests(unittest.TestCase):
1245 def test_getlogin(self):
1246 user_name = os.getlogin()
1247 self.assertNotEqual(len(user_name), 0)
1248
1249
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001250@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1251 "needs os.getpriority and os.setpriority")
1252class ProgramPriorityTests(unittest.TestCase):
1253 """Tests for os.getpriority() and os.setpriority()."""
1254
1255 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001256
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001257 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1258 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1259 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001260 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1261 if base >= 19 and new_prio <= 19:
1262 raise unittest.SkipTest(
1263 "unable to reliably test setpriority at current nice level of %s" % base)
1264 else:
1265 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001266 finally:
1267 try:
1268 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1269 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001270 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001271 raise
1272
1273
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001274if threading is not None:
1275 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001276
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001277 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001278
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001279 def __init__(self, conn):
1280 asynchat.async_chat.__init__(self, conn)
1281 self.in_buffer = []
1282 self.closed = False
1283 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001284
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001285 def handle_read(self):
1286 data = self.recv(4096)
1287 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001288
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001289 def get_data(self):
1290 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001291
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001292 def handle_close(self):
1293 self.close()
1294 self.closed = True
1295
1296 def handle_error(self):
1297 raise
1298
1299 def __init__(self, address):
1300 threading.Thread.__init__(self)
1301 asyncore.dispatcher.__init__(self)
1302 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1303 self.bind(address)
1304 self.listen(5)
1305 self.host, self.port = self.socket.getsockname()[:2]
1306 self.handler_instance = None
1307 self._active = False
1308 self._active_lock = threading.Lock()
1309
1310 # --- public API
1311
1312 @property
1313 def running(self):
1314 return self._active
1315
1316 def start(self):
1317 assert not self.running
1318 self.__flag = threading.Event()
1319 threading.Thread.start(self)
1320 self.__flag.wait()
1321
1322 def stop(self):
1323 assert self.running
1324 self._active = False
1325 self.join()
1326
1327 def wait(self):
1328 # wait for handler connection to be closed, then stop the server
1329 while not getattr(self.handler_instance, "closed", False):
1330 time.sleep(0.001)
1331 self.stop()
1332
1333 # --- internals
1334
1335 def run(self):
1336 self._active = True
1337 self.__flag.set()
1338 while self._active and asyncore.socket_map:
1339 self._active_lock.acquire()
1340 asyncore.loop(timeout=0.001, count=1)
1341 self._active_lock.release()
1342 asyncore.close_all()
1343
1344 def handle_accept(self):
1345 conn, addr = self.accept()
1346 self.handler_instance = self.Handler(conn)
1347
1348 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001349 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001350 handle_read = handle_connect
1351
1352 def writable(self):
1353 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001354
1355 def handle_error(self):
1356 raise
1357
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001358
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001359@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001360@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1361class TestSendfile(unittest.TestCase):
1362
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001363 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001364 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001365 not sys.platform.startswith("solaris") and \
1366 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001367
1368 @classmethod
1369 def setUpClass(cls):
1370 with open(support.TESTFN, "wb") as f:
1371 f.write(cls.DATA)
1372
1373 @classmethod
1374 def tearDownClass(cls):
1375 support.unlink(support.TESTFN)
1376
1377 def setUp(self):
1378 self.server = SendfileTestServer((support.HOST, 0))
1379 self.server.start()
1380 self.client = socket.socket()
1381 self.client.connect((self.server.host, self.server.port))
1382 self.client.settimeout(1)
1383 # synchronize by waiting for "220 ready" response
1384 self.client.recv(1024)
1385 self.sockno = self.client.fileno()
1386 self.file = open(support.TESTFN, 'rb')
1387 self.fileno = self.file.fileno()
1388
1389 def tearDown(self):
1390 self.file.close()
1391 self.client.close()
1392 if self.server.running:
1393 self.server.stop()
1394
1395 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1396 """A higher level wrapper representing how an application is
1397 supposed to use sendfile().
1398 """
1399 while 1:
1400 try:
1401 if self.SUPPORT_HEADERS_TRAILERS:
1402 return os.sendfile(sock, file, offset, nbytes, headers,
1403 trailers)
1404 else:
1405 return os.sendfile(sock, file, offset, nbytes)
1406 except OSError as err:
1407 if err.errno == errno.ECONNRESET:
1408 # disconnected
1409 raise
1410 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1411 # we have to retry send data
1412 continue
1413 else:
1414 raise
1415
1416 def test_send_whole_file(self):
1417 # normal send
1418 total_sent = 0
1419 offset = 0
1420 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001421 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001422 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1423 if sent == 0:
1424 break
1425 offset += sent
1426 total_sent += sent
1427 self.assertTrue(sent <= nbytes)
1428 self.assertEqual(offset, total_sent)
1429
1430 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001431 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001432 self.client.close()
1433 self.server.wait()
1434 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001435 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001436 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001437
1438 def test_send_at_certain_offset(self):
1439 # start sending a file at a certain offset
1440 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001441 offset = len(self.DATA) // 2
1442 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001443 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001444 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001445 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1446 if sent == 0:
1447 break
1448 offset += sent
1449 total_sent += sent
1450 self.assertTrue(sent <= nbytes)
1451
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001452 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001453 self.client.close()
1454 self.server.wait()
1455 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001456 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001457 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001458 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001459 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001460
1461 def test_offset_overflow(self):
1462 # specify an offset > file size
1463 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001464 try:
1465 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1466 except OSError as e:
1467 # Solaris can raise EINVAL if offset >= file length, ignore.
1468 if e.errno != errno.EINVAL:
1469 raise
1470 else:
1471 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001472 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001473 self.client.close()
1474 self.server.wait()
1475 data = self.server.handler_instance.get_data()
1476 self.assertEqual(data, b'')
1477
1478 def test_invalid_offset(self):
1479 with self.assertRaises(OSError) as cm:
1480 os.sendfile(self.sockno, self.fileno, -1, 4096)
1481 self.assertEqual(cm.exception.errno, errno.EINVAL)
1482
1483 # --- headers / trailers tests
1484
1485 if SUPPORT_HEADERS_TRAILERS:
1486
1487 def test_headers(self):
1488 total_sent = 0
1489 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1490 headers=[b"x" * 512])
1491 total_sent += sent
1492 offset = 4096
1493 nbytes = 4096
1494 while 1:
1495 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1496 offset, nbytes)
1497 if sent == 0:
1498 break
1499 total_sent += sent
1500 offset += sent
1501
1502 expected_data = b"x" * 512 + self.DATA
1503 self.assertEqual(total_sent, len(expected_data))
1504 self.client.close()
1505 self.server.wait()
1506 data = self.server.handler_instance.get_data()
1507 self.assertEqual(hash(data), hash(expected_data))
1508
1509 def test_trailers(self):
1510 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001511 with open(TESTFN2, 'wb') as f:
1512 f.write(b"abcde")
1513 with open(TESTFN2, 'rb')as f:
1514 self.addCleanup(os.remove, TESTFN2)
1515 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1516 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001517 self.client.close()
1518 self.server.wait()
1519 data = self.server.handler_instance.get_data()
1520 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001521
1522 if hasattr(os, "SF_NODISKIO"):
1523 def test_flags(self):
1524 try:
1525 os.sendfile(self.sockno, self.fileno, 0, 4096,
1526 flags=os.SF_NODISKIO)
1527 except OSError as err:
1528 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1529 raise
1530
1531
Benjamin Peterson799bd802011-08-31 22:15:17 -04001532def supports_extended_attributes():
1533 if not hasattr(os, "setxattr"):
1534 return False
1535 try:
1536 with open(support.TESTFN, "wb") as fp:
1537 try:
1538 os.fsetxattr(fp.fileno(), b"user.test", b"")
1539 except OSError as e:
1540 if e.errno != errno.ENOTSUP:
1541 raise
1542 return False
1543 finally:
1544 support.unlink(support.TESTFN)
1545 # Kernels < 2.6.39 don't respect setxattr flags.
1546 kernel_version = platform.release()
1547 m = re.match("2.6.(\d{1,2})", kernel_version)
1548 return m is None or int(m.group(1)) >= 39
1549
1550
1551@unittest.skipUnless(supports_extended_attributes(),
1552 "no non-broken extended attribute support")
1553class ExtendedAttributeTests(unittest.TestCase):
1554
1555 def tearDown(self):
1556 support.unlink(support.TESTFN)
1557
1558 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1559 fn = support.TESTFN
1560 open(fn, "wb").close()
1561 with self.assertRaises(OSError) as cm:
1562 getxattr(fn, s("user.test"))
1563 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001564 init_xattr = listxattr(fn)
1565 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001566 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001567 xattr = set(init_xattr)
1568 xattr.add("user.test")
1569 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001570 self.assertEqual(getxattr(fn, b"user.test"), b"")
1571 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1572 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1573 with self.assertRaises(OSError) as cm:
1574 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1575 self.assertEqual(cm.exception.errno, errno.EEXIST)
1576 with self.assertRaises(OSError) as cm:
1577 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1578 self.assertEqual(cm.exception.errno, errno.ENODATA)
1579 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001580 xattr.add("user.test2")
1581 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001582 removexattr(fn, s("user.test"))
1583 with self.assertRaises(OSError) as cm:
1584 getxattr(fn, s("user.test"))
1585 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001586 xattr.remove("user.test")
1587 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001588 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1589 setxattr(fn, s("user.test"), b"a"*1024)
1590 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1591 removexattr(fn, s("user.test"))
1592 many = sorted("user.test{}".format(i) for i in range(100))
1593 for thing in many:
1594 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001595 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001596
1597 def _check_xattrs(self, *args):
1598 def make_bytes(s):
1599 return bytes(s, "ascii")
1600 self._check_xattrs_str(str, *args)
1601 support.unlink(support.TESTFN)
1602 self._check_xattrs_str(make_bytes, *args)
1603
1604 def test_simple(self):
1605 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1606 os.listxattr)
1607
1608 def test_lpath(self):
1609 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1610 os.llistxattr)
1611
1612 def test_fds(self):
1613 def getxattr(path, *args):
1614 with open(path, "rb") as fp:
1615 return os.fgetxattr(fp.fileno(), *args)
1616 def setxattr(path, *args):
1617 with open(path, "wb") as fp:
1618 os.fsetxattr(fp.fileno(), *args)
1619 def removexattr(path, *args):
1620 with open(path, "wb") as fp:
1621 os.fremovexattr(fp.fileno(), *args)
1622 def listxattr(path, *args):
1623 with open(path, "rb") as fp:
1624 return os.flistxattr(fp.fileno(), *args)
1625 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1626
1627
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001628@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1629class Win32DeprecatedBytesAPI(unittest.TestCase):
1630 def test_deprecated(self):
1631 import nt
1632 filename = os.fsencode(support.TESTFN)
1633 with warnings.catch_warnings():
1634 warnings.simplefilter("error", DeprecationWarning)
1635 for func, *args in (
1636 (nt._getfullpathname, filename),
1637 (nt._isdir, filename),
1638 (os.access, filename, os.R_OK),
1639 (os.chdir, filename),
1640 (os.chmod, filename, 0o777),
1641 (os.link, filename, filename),
1642 (os.listdir, filename),
1643 (os.lstat, filename),
1644 (os.mkdir, filename),
1645 (os.open, filename, os.O_RDONLY),
1646 (os.rename, filename, filename),
1647 (os.rmdir, filename),
1648 (os.startfile, filename),
1649 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001650 (os.unlink, filename),
1651 (os.utime, filename),
1652 ):
1653 self.assertRaises(DeprecationWarning, func, *args)
1654
Victor Stinner28216442011-11-16 00:34:44 +01001655 @support.skip_unless_symlink
1656 def test_symlink(self):
1657 filename = os.fsencode(support.TESTFN)
1658 with warnings.catch_warnings():
1659 warnings.simplefilter("error", DeprecationWarning)
1660 self.assertRaises(DeprecationWarning,
1661 os.symlink, filename, filename)
1662
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001663
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001664@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001665def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001666 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001667 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001668 StatAttributeTests,
1669 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001670 WalkTests,
1671 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001672 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001673 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001674 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001675 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001676 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001677 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001678 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001679 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001680 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001681 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001682 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001683 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001684 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001685 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001686 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001687 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001688 Win32DeprecatedBytesAPI,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001689 )
Fred Drake2e2be372001-09-20 21:33:42 +00001690
1691if __name__ == "__main__":
1692 test_main()