blob: e78db48afe049d4305ee1ff9afed383d52042aca [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
23try:
24 import threading
25except ImportError:
26 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000027
Mark Dickinson7cf03892010-04-16 13:45:35 +000028# Detect whether we're on a Linux system that uses the (now outdated
29# and unmaintained) linuxthreads threading library. There's an issue
30# when combining linuxthreads with a failed execv call: see
31# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020032if hasattr(sys, 'thread_info') and sys.thread_info.version:
33 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
34else:
35 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000036
Thomas Wouters0e3f5912006-08-11 14:57:12 +000037# Tests creating TESTFN
38class FileTests(unittest.TestCase):
39 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040 if os.path.exists(support.TESTFN):
41 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000042 tearDown = setUp
43
44 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000045 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000047 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048
Christian Heimesfdab48e2008-01-20 09:06:41 +000049 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000050 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
51 # We must allocate two consecutive file descriptors, otherwise
52 # it will mess up other file descriptors (perhaps even the three
53 # standard ones).
54 second = os.dup(first)
55 try:
56 retries = 0
57 while second != first + 1:
58 os.close(first)
59 retries += 1
60 if retries > 10:
61 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000062 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first, second = second, os.dup(second)
64 finally:
65 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000066 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000067 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000068 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000070 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000071 def test_rename(self):
72 path = support.TESTFN
73 old = sys.getrefcount(path)
74 self.assertRaises(TypeError, os.rename, path, 0)
75 new = sys.getrefcount(path)
76 self.assertEqual(old, new)
77
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000078 def test_read(self):
79 with open(support.TESTFN, "w+b") as fobj:
80 fobj.write(b"spam")
81 fobj.flush()
82 fd = fobj.fileno()
83 os.lseek(fd, 0, 0)
84 s = os.read(fd, 4)
85 self.assertEqual(type(s), bytes)
86 self.assertEqual(s, b"spam")
87
88 def test_write(self):
89 # os.write() accepts bytes- and buffer-like objects but not strings
90 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
91 self.assertRaises(TypeError, os.write, fd, "beans")
92 os.write(fd, b"bacon\n")
93 os.write(fd, bytearray(b"eggs\n"))
94 os.write(fd, memoryview(b"spam\n"))
95 os.close(fd)
96 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000097 self.assertEqual(fobj.read().splitlines(),
98 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099
Victor Stinnere0daff12011-03-20 23:36:35 +0100100 def write_windows_console(self, *args):
101 retcode = subprocess.call(args,
102 # use a new console to not flood the test output
103 creationflags=subprocess.CREATE_NEW_CONSOLE,
104 # use a shell to hide the console window (SW_HIDE)
105 shell=True)
106 self.assertEqual(retcode, 0)
107
108 @unittest.skipUnless(sys.platform == 'win32',
109 'test specific to the Windows console')
110 def test_write_windows_console(self):
111 # Issue #11395: the Windows console returns an error (12: not enough
112 # space error) on writing into stdout if stdout mode is binary and the
113 # length is greater than 66,000 bytes (or less, depending on heap
114 # usage).
115 code = "print('x' * 100000)"
116 self.write_windows_console(sys.executable, "-c", code)
117 self.write_windows_console(sys.executable, "-u", "-c", code)
118
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000119 def fdopen_helper(self, *args):
120 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200121 f = os.fdopen(fd, *args)
122 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000123
124 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200125 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
126 os.close(fd)
127
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000128 self.fdopen_helper()
129 self.fdopen_helper('r')
130 self.fdopen_helper('r', 100)
131
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200132
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000133# Test attributes on return values from os.*stat* family.
134class StatAttributeTests(unittest.TestCase):
135 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000136 os.mkdir(support.TESTFN)
137 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000138 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000139 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000140 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000141
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000142 def tearDown(self):
143 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000144 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000145
Antoine Pitrou38425292010-09-21 18:19:07 +0000146 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000147 if not hasattr(os, "stat"):
148 return
149
150 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000151 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000152
153 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000154 self.assertEqual(result[stat.ST_SIZE], 3)
155 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000156
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000157 # Make sure all the attributes are there
158 members = dir(result)
159 for name in dir(stat):
160 if name[:3] == 'ST_':
161 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000162 if name.endswith("TIME"):
163 def trunc(x): return int(x)
164 else:
165 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000166 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000168 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169
170 try:
171 result[200]
172 self.fail("No exception thrown")
173 except IndexError:
174 pass
175
176 # Make sure that assignment fails
177 try:
178 result.st_mode = 1
179 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000180 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 pass
182
183 try:
184 result.st_rdev = 1
185 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000186 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000187 pass
188
189 try:
190 result.parrot = 1
191 self.fail("No exception thrown")
192 except AttributeError:
193 pass
194
195 # Use the stat_result constructor with a too-short tuple.
196 try:
197 result2 = os.stat_result((10,))
198 self.fail("No exception thrown")
199 except TypeError:
200 pass
201
Ezio Melotti42da6632011-03-15 05:18:48 +0200202 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203 try:
204 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
205 except TypeError:
206 pass
207
Antoine Pitrou38425292010-09-21 18:19:07 +0000208 def test_stat_attributes(self):
209 self.check_stat_attributes(self.fname)
210
211 def test_stat_attributes_bytes(self):
212 try:
213 fname = self.fname.encode(sys.getfilesystemencoding())
214 except UnicodeEncodeError:
215 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100216 with warnings.catch_warnings():
217 warnings.simplefilter("ignore", DeprecationWarning)
218 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000219
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 def test_statvfs_attributes(self):
221 if not hasattr(os, "statvfs"):
222 return
223
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000224 try:
225 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000226 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000227 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000228 if e.errno == errno.ENOSYS:
229 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000230
231 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000232 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000234 # Make sure all the attributes are there.
235 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
236 'ffree', 'favail', 'flag', 'namemax')
237 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000238 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000239
240 # Make sure that assignment really fails
241 try:
242 result.f_bfree = 1
243 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000244 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000245 pass
246
247 try:
248 result.parrot = 1
249 self.fail("No exception thrown")
250 except AttributeError:
251 pass
252
253 # Use the constructor with a too-short tuple.
254 try:
255 result2 = os.statvfs_result((10,))
256 self.fail("No exception thrown")
257 except TypeError:
258 pass
259
Ezio Melotti42da6632011-03-15 05:18:48 +0200260 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261 try:
262 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
263 except TypeError:
264 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000265
Thomas Wouters89f507f2006-12-13 04:49:30 +0000266 def test_utime_dir(self):
267 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000268 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000269 # round to int, because some systems may support sub-second
270 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
272 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000273 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000274
Brian Curtin52fbea12011-11-06 13:41:17 -0600275 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600276 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600277 # second argument. Check that the previous methods of passing
278 # a time tuple or None work in addition to no argument.
279 st = os.stat(support.TESTFN)
280 # Doesn't set anything new, but sets the time tuple way
281 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
282 # Set to the current time in the old explicit way.
283 os.utime(support.TESTFN, None)
284 st1 = os.stat(support.TESTFN)
285 # Set to the current time in the new way
286 os.utime(support.TESTFN)
287 st2 = os.stat(support.TESTFN)
288 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
289
Thomas Wouters89f507f2006-12-13 04:49:30 +0000290 # Restrict test to Win32, since there is no guarantee other
291 # systems support centiseconds
292 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000293 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000294 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000295 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000296 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000297 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000298 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000299 return buf.value
300
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000301 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000302 def test_1565150(self):
303 t1 = 1159195039.25
304 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000305 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000306
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000307 def test_large_time(self):
308 t1 = 5000000000 # some day in 2128
309 os.utime(self.fname, (t1, t1))
310 self.assertEqual(os.stat(self.fname).st_mtime, t1)
311
Guido van Rossumd8faa362007-04-27 19:54:29 +0000312 def test_1686475(self):
313 # Verify that an open file can be stat'ed
314 try:
315 os.stat(r"c:\pagefile.sys")
316 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000317 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000318 return
319 self.fail("Could not stat pagefile.sys")
320
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000321from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000322
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000323class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000324 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000325 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000326
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000327 def setUp(self):
328 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000329 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000330 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000331 for key, value in self._reference().items():
332 os.environ[key] = value
333
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000334 def tearDown(self):
335 os.environ.clear()
336 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000337 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000338 os.environb.clear()
339 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000340
Christian Heimes90333392007-11-01 19:08:42 +0000341 def _reference(self):
342 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
343
344 def _empty_mapping(self):
345 os.environ.clear()
346 return os.environ
347
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000348 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000349 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000350 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000351 if os.path.exists("/bin/sh"):
352 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000353 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
354 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000355 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000356
Christian Heimes1a13d592007-11-08 14:16:55 +0000357 def test_os_popen_iter(self):
358 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000359 with os.popen(
360 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
361 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000362 self.assertEqual(next(it), "line1\n")
363 self.assertEqual(next(it), "line2\n")
364 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000365 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000366
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000367 # Verify environ keys and values from the OS are of the
368 # correct str type.
369 def test_keyvalue_types(self):
370 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000371 self.assertEqual(type(key), str)
372 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000373
Christian Heimes90333392007-11-01 19:08:42 +0000374 def test_items(self):
375 for key, value in self._reference().items():
376 self.assertEqual(os.environ.get(key), value)
377
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000378 # Issue 7310
379 def test___repr__(self):
380 """Check that the repr() of os.environ looks like environ({...})."""
381 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000382 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
383 '{!r}: {!r}'.format(key, value)
384 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000385
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000386 def test_get_exec_path(self):
387 defpath_list = os.defpath.split(os.pathsep)
388 test_path = ['/monty', '/python', '', '/flying/circus']
389 test_env = {'PATH': os.pathsep.join(test_path)}
390
391 saved_environ = os.environ
392 try:
393 os.environ = dict(test_env)
394 # Test that defaulting to os.environ works.
395 self.assertSequenceEqual(test_path, os.get_exec_path())
396 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
397 finally:
398 os.environ = saved_environ
399
400 # No PATH environment variable
401 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
402 # Empty PATH environment variable
403 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
404 # Supplied PATH environment variable
405 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
406
Victor Stinnerb745a742010-05-18 17:17:23 +0000407 if os.supports_bytes_environ:
408 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000409 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000410 # ignore BytesWarning warning
411 with warnings.catch_warnings(record=True):
412 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000413 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000414 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000415 pass
416 else:
417 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000418
419 # bytes key and/or value
420 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
421 ['abc'])
422 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
423 ['abc'])
424 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
425 ['abc'])
426
427 @unittest.skipUnless(os.supports_bytes_environ,
428 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000429 def test_environb(self):
430 # os.environ -> os.environb
431 value = 'euro\u20ac'
432 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000433 value_bytes = value.encode(sys.getfilesystemencoding(),
434 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000435 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000436 msg = "U+20AC character is not encodable to %s" % (
437 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000438 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000439 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000440 self.assertEqual(os.environ['unicode'], value)
441 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000442
443 # os.environb -> os.environ
444 value = b'\xff'
445 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000446 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000447 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000448 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000449
Charles-François Natali2966f102011-11-26 11:32:46 +0100450 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
451 # #13415).
452 @support.requires_freebsd_version(7)
453 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100454 def test_unset_error(self):
455 if sys.platform == "win32":
456 # an environment variable is limited to 32,767 characters
457 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100458 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100459 else:
460 # "=" is not allowed in a variable name
461 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100462 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100463
Tim Petersc4e09402003-04-25 07:11:48 +0000464class WalkTests(unittest.TestCase):
465 """Tests for os.walk()."""
466
467 def test_traversal(self):
468 import os
469 from os.path import join
470
471 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000472 # TESTFN/
473 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000474 # tmp1
475 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000476 # tmp2
477 # SUB11/ no kids
478 # SUB2/ a file kid and a dirsymlink kid
479 # tmp3
480 # link/ a symlink to TESTFN.2
481 # TEST2/
482 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000483 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000484 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000485 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000486 sub2_path = join(walk_path, "SUB2")
487 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000488 tmp2_path = join(sub1_path, "tmp2")
489 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000490 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000491 t2_path = join(support.TESTFN, "TEST2")
492 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000493
494 # Create stuff.
495 os.makedirs(sub11_path)
496 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000497 os.makedirs(t2_path)
498 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000499 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000500 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
501 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000502 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100503 if os.name == 'nt':
504 def symlink_to_dir(src, dest):
505 os.symlink(src, dest, True)
506 else:
507 symlink_to_dir = os.symlink
508 symlink_to_dir(os.path.abspath(t2_path), link_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000509 sub2_tree = (sub2_path, ["link"], ["tmp3"])
510 else:
511 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000512
513 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000514 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000515 self.assertEqual(len(all), 4)
516 # We can't know which order SUB1 and SUB2 will appear in.
517 # Not flipped: TESTFN, SUB1, SUB11, SUB2
518 # flipped: TESTFN, SUB2, SUB1, SUB11
519 flipped = all[0][1][0] != "SUB1"
520 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000521 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000522 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
523 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000524 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000525
526 # Prune the search.
527 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000528 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000529 all.append((root, dirs, files))
530 # Don't descend into SUB1.
531 if 'SUB1' in dirs:
532 # Note that this also mutates the dirs we appended to all!
533 dirs.remove('SUB1')
534 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000535 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
536 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000537
538 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000539 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000540 self.assertEqual(len(all), 4)
541 # We can't know which order SUB1 and SUB2 will appear in.
542 # Not flipped: SUB11, SUB1, SUB2, TESTFN
543 # flipped: SUB2, SUB11, SUB1, TESTFN
544 flipped = all[3][1][0] != "SUB1"
545 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000546 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000547 self.assertEqual(all[flipped], (sub11_path, [], []))
548 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000549 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000550
Brian Curtin3b4499c2010-12-28 14:31:47 +0000551 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000552 # Walk, following symlinks.
553 for root, dirs, files in os.walk(walk_path, followlinks=True):
554 if root == link_path:
555 self.assertEqual(dirs, [])
556 self.assertEqual(files, ["tmp4"])
557 break
558 else:
559 self.fail("Didn't follow symlink with followlinks=True")
560
561 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000562 # Tear everything down. This is a decent use for bottom-up on
563 # Windows, which doesn't have a recursive delete command. The
564 # (not so) subtlety is that rmdir will fail unless the dir's
565 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000566 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000567 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000568 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000569 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000570 dirname = os.path.join(root, name)
571 if not os.path.islink(dirname):
572 os.rmdir(dirname)
573 else:
574 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000575 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000576
Guido van Rossume7ba4952007-06-06 23:52:48 +0000577class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000578 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000579 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000580
581 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000582 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000583 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
584 os.makedirs(path) # Should work
585 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
586 os.makedirs(path)
587
588 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000589 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000590 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
591 os.makedirs(path)
592 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
593 'dir5', 'dir6')
594 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000595
Terry Reedy5a22b652010-12-02 07:05:56 +0000596 def test_exist_ok_existing_directory(self):
597 path = os.path.join(support.TESTFN, 'dir1')
598 mode = 0o777
599 old_mask = os.umask(0o022)
600 os.makedirs(path, mode)
601 self.assertRaises(OSError, os.makedirs, path, mode)
602 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
603 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
604 os.makedirs(path, mode=mode, exist_ok=True)
605 os.umask(old_mask)
606
607 def test_exist_ok_existing_regular_file(self):
608 base = support.TESTFN
609 path = os.path.join(support.TESTFN, 'dir1')
610 f = open(path, 'w')
611 f.write('abc')
612 f.close()
613 self.assertRaises(OSError, os.makedirs, path)
614 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
615 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
616 os.remove(path)
617
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000618 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000619 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000620 'dir4', 'dir5', 'dir6')
621 # If the tests failed, the bottom-most directory ('../dir6')
622 # may not have been created, so we look for the outermost directory
623 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000624 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000625 path = os.path.dirname(path)
626
627 os.removedirs(path)
628
Guido van Rossume7ba4952007-06-06 23:52:48 +0000629class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000630 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200631 with open(os.devnull, 'wb') as f:
632 f.write(b'hello')
633 f.close()
634 with open(os.devnull, 'rb') as f:
635 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000636
Guido van Rossume7ba4952007-06-06 23:52:48 +0000637class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000638 def test_urandom(self):
639 try:
640 self.assertEqual(len(os.urandom(1)), 1)
641 self.assertEqual(len(os.urandom(10)), 10)
642 self.assertEqual(len(os.urandom(100)), 100)
643 self.assertEqual(len(os.urandom(1000)), 1000)
644 except NotImplementedError:
645 pass
646
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000647@contextlib.contextmanager
648def _execvpe_mockup(defpath=None):
649 """
650 Stubs out execv and execve functions when used as context manager.
651 Records exec calls. The mock execv and execve functions always raise an
652 exception as they would normally never return.
653 """
654 # A list of tuples containing (function name, first arg, args)
655 # of calls to execv or execve that have been made.
656 calls = []
657
658 def mock_execv(name, *args):
659 calls.append(('execv', name, args))
660 raise RuntimeError("execv called")
661
662 def mock_execve(name, *args):
663 calls.append(('execve', name, args))
664 raise OSError(errno.ENOTDIR, "execve called")
665
666 try:
667 orig_execv = os.execv
668 orig_execve = os.execve
669 orig_defpath = os.defpath
670 os.execv = mock_execv
671 os.execve = mock_execve
672 if defpath is not None:
673 os.defpath = defpath
674 yield calls
675 finally:
676 os.execv = orig_execv
677 os.execve = orig_execve
678 os.defpath = orig_defpath
679
Guido van Rossume7ba4952007-06-06 23:52:48 +0000680class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000681 @unittest.skipIf(USING_LINUXTHREADS,
682 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000683 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000684 self.assertRaises(OSError, os.execvpe, 'no such app-',
685 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000686
Thomas Heller6790d602007-08-30 17:15:14 +0000687 def test_execvpe_with_bad_arglist(self):
688 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
689
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000690 @unittest.skipUnless(hasattr(os, '_execvpe'),
691 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000692 def _test_internal_execvpe(self, test_type):
693 program_path = os.sep + 'absolutepath'
694 if test_type is bytes:
695 program = b'executable'
696 fullpath = os.path.join(os.fsencode(program_path), program)
697 native_fullpath = fullpath
698 arguments = [b'progname', 'arg1', 'arg2']
699 else:
700 program = 'executable'
701 arguments = ['progname', 'arg1', 'arg2']
702 fullpath = os.path.join(program_path, program)
703 if os.name != "nt":
704 native_fullpath = os.fsencode(fullpath)
705 else:
706 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000707 env = {'spam': 'beans'}
708
Victor Stinnerb745a742010-05-18 17:17:23 +0000709 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000710 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000711 self.assertRaises(RuntimeError,
712 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000713 self.assertEqual(len(calls), 1)
714 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
715
Victor Stinnerb745a742010-05-18 17:17:23 +0000716 # test os._execvpe() with a relative path:
717 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000718 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000719 self.assertRaises(OSError,
720 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000721 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000722 self.assertSequenceEqual(calls[0],
723 ('execve', native_fullpath, (arguments, env)))
724
725 # test os._execvpe() with a relative path:
726 # os.get_exec_path() reads the 'PATH' variable
727 with _execvpe_mockup() as calls:
728 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000729 if test_type is bytes:
730 env_path[b'PATH'] = program_path
731 else:
732 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000733 self.assertRaises(OSError,
734 os._execvpe, program, arguments, env=env_path)
735 self.assertEqual(len(calls), 1)
736 self.assertSequenceEqual(calls[0],
737 ('execve', native_fullpath, (arguments, env_path)))
738
739 def test_internal_execvpe_str(self):
740 self._test_internal_execvpe(str)
741 if os.name != "nt":
742 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000743
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000744
Thomas Wouters477c8d52006-05-27 19:21:47 +0000745class Win32ErrorTests(unittest.TestCase):
746 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000747 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000748
749 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000750 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000751
752 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000753 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000754
755 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000756 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000757 try:
758 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
759 finally:
760 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000761 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000762
763 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000764 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000765
Thomas Wouters477c8d52006-05-27 19:21:47 +0000766 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000767 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000768
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000769class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000770 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000771 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
772 #singles.append("close")
773 #We omit close because it doesn'r raise an exception on some platforms
774 def get_single(f):
775 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000776 if hasattr(os, f):
777 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000778 return helper
779 for f in singles:
780 locals()["test_"+f] = get_single(f)
781
Benjamin Peterson7522c742009-01-19 21:00:09 +0000782 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000783 try:
784 f(support.make_bad_fd(), *args)
785 except OSError as e:
786 self.assertEqual(e.errno, errno.EBADF)
787 else:
788 self.fail("%r didn't raise a OSError with a bad file descriptor"
789 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000790
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000791 def test_isatty(self):
792 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000793 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000794
795 def test_closerange(self):
796 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000797 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000798 # Make sure none of the descriptors we are about to close are
799 # currently valid (issue 6542).
800 for i in range(10):
801 try: os.fstat(fd+i)
802 except OSError:
803 pass
804 else:
805 break
806 if i < 2:
807 raise unittest.SkipTest(
808 "Unable to acquire a range of invalid file descriptors")
809 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000810
811 def test_dup2(self):
812 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000813 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000814
815 def test_fchmod(self):
816 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000817 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000818
819 def test_fchown(self):
820 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000821 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000822
823 def test_fpathconf(self):
824 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000825 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000826
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000827 def test_ftruncate(self):
828 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000829 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000830
831 def test_lseek(self):
832 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000833 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000834
835 def test_read(self):
836 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000837 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000838
839 def test_tcsetpgrpt(self):
840 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000841 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000842
843 def test_write(self):
844 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000845 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000846
Brian Curtin1b9df392010-11-24 20:24:31 +0000847
848class LinkTests(unittest.TestCase):
849 def setUp(self):
850 self.file1 = support.TESTFN
851 self.file2 = os.path.join(support.TESTFN + "2")
852
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000853 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000854 for file in (self.file1, self.file2):
855 if os.path.exists(file):
856 os.unlink(file)
857
Brian Curtin1b9df392010-11-24 20:24:31 +0000858 def _test_link(self, file1, file2):
859 with open(file1, "w") as f1:
860 f1.write("test")
861
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100862 with warnings.catch_warnings():
863 warnings.simplefilter("ignore", DeprecationWarning)
864 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +0000865 with open(file1, "r") as f1, open(file2, "r") as f2:
866 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
867
868 def test_link(self):
869 self._test_link(self.file1, self.file2)
870
871 def test_link_bytes(self):
872 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
873 bytes(self.file2, sys.getfilesystemencoding()))
874
Brian Curtinf498b752010-11-30 15:54:04 +0000875 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000876 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000877 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000878 except UnicodeError:
879 raise unittest.SkipTest("Unable to encode for this platform.")
880
Brian Curtinf498b752010-11-30 15:54:04 +0000881 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000882 self.file2 = self.file1 + "2"
883 self._test_link(self.file1, self.file2)
884
Thomas Wouters477c8d52006-05-27 19:21:47 +0000885if sys.platform != 'win32':
886 class Win32ErrorTests(unittest.TestCase):
887 pass
888
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000889 class PosixUidGidTests(unittest.TestCase):
890 if hasattr(os, 'setuid'):
891 def test_setuid(self):
892 if os.getuid() != 0:
893 self.assertRaises(os.error, os.setuid, 0)
894 self.assertRaises(OverflowError, os.setuid, 1<<32)
895
896 if hasattr(os, 'setgid'):
897 def test_setgid(self):
898 if os.getuid() != 0:
899 self.assertRaises(os.error, os.setgid, 0)
900 self.assertRaises(OverflowError, os.setgid, 1<<32)
901
902 if hasattr(os, 'seteuid'):
903 def test_seteuid(self):
904 if os.getuid() != 0:
905 self.assertRaises(os.error, os.seteuid, 0)
906 self.assertRaises(OverflowError, os.seteuid, 1<<32)
907
908 if hasattr(os, 'setegid'):
909 def test_setegid(self):
910 if os.getuid() != 0:
911 self.assertRaises(os.error, os.setegid, 0)
912 self.assertRaises(OverflowError, os.setegid, 1<<32)
913
914 if hasattr(os, 'setreuid'):
915 def test_setreuid(self):
916 if os.getuid() != 0:
917 self.assertRaises(os.error, os.setreuid, 0, 0)
918 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
919 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000920
921 def test_setreuid_neg1(self):
922 # Needs to accept -1. We run this in a subprocess to avoid
923 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000924 subprocess.check_call([
925 sys.executable, '-c',
926 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000927
928 if hasattr(os, 'setregid'):
929 def test_setregid(self):
930 if os.getuid() != 0:
931 self.assertRaises(os.error, os.setregid, 0, 0)
932 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
933 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000934
935 def test_setregid_neg1(self):
936 # Needs to accept -1. We run this in a subprocess to avoid
937 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000938 subprocess.check_call([
939 sys.executable, '-c',
940 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +0000941
942 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +0000943 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000944 if support.TESTFN_UNENCODABLE:
945 self.dir = support.TESTFN_UNENCODABLE
946 else:
947 self.dir = support.TESTFN
948 self.bdir = os.fsencode(self.dir)
949
950 bytesfn = []
951 def add_filename(fn):
952 try:
953 fn = os.fsencode(fn)
954 except UnicodeEncodeError:
955 return
956 bytesfn.append(fn)
957 add_filename(support.TESTFN_UNICODE)
958 if support.TESTFN_UNENCODABLE:
959 add_filename(support.TESTFN_UNENCODABLE)
960 if not bytesfn:
961 self.skipTest("couldn't create any non-ascii filename")
962
963 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +0000964 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000965 try:
966 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +0200967 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +0000968 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000969 if fn in self.unicodefn:
970 raise ValueError("duplicate filename")
971 self.unicodefn.add(fn)
972 except:
973 shutil.rmtree(self.dir)
974 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +0000975
976 def tearDown(self):
977 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000978
979 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000980 expected = self.unicodefn
981 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000982 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000983
984 def test_open(self):
985 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +0200986 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +0000987 f.close()
988
989 def test_stat(self):
990 for fn in self.unicodefn:
991 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000992else:
993 class PosixUidGidTests(unittest.TestCase):
994 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +0000995 class Pep383Tests(unittest.TestCase):
996 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000997
Brian Curtineb24d742010-04-12 17:16:38 +0000998@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
999class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001000 def _kill(self, sig):
1001 # Start sys.executable as a subprocess and communicate from the
1002 # subprocess to the parent that the interpreter is ready. When it
1003 # becomes ready, send *sig* via os.kill to the subprocess and check
1004 # that the return code is equal to *sig*.
1005 import ctypes
1006 from ctypes import wintypes
1007 import msvcrt
1008
1009 # Since we can't access the contents of the process' stdout until the
1010 # process has exited, use PeekNamedPipe to see what's inside stdout
1011 # without waiting. This is done so we can tell that the interpreter
1012 # is started and running at a point where it could handle a signal.
1013 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1014 PeekNamedPipe.restype = wintypes.BOOL
1015 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1016 ctypes.POINTER(ctypes.c_char), # stdout buf
1017 wintypes.DWORD, # Buffer size
1018 ctypes.POINTER(wintypes.DWORD), # bytes read
1019 ctypes.POINTER(wintypes.DWORD), # bytes avail
1020 ctypes.POINTER(wintypes.DWORD)) # bytes left
1021 msg = "running"
1022 proc = subprocess.Popen([sys.executable, "-c",
1023 "import sys;"
1024 "sys.stdout.write('{}');"
1025 "sys.stdout.flush();"
1026 "input()".format(msg)],
1027 stdout=subprocess.PIPE,
1028 stderr=subprocess.PIPE,
1029 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001030 self.addCleanup(proc.stdout.close)
1031 self.addCleanup(proc.stderr.close)
1032 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001033
1034 count, max = 0, 100
1035 while count < max and proc.poll() is None:
1036 # Create a string buffer to store the result of stdout from the pipe
1037 buf = ctypes.create_string_buffer(len(msg))
1038 # Obtain the text currently in proc.stdout
1039 # Bytes read/avail/left are left as NULL and unused
1040 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1041 buf, ctypes.sizeof(buf), None, None, None)
1042 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1043 if buf.value:
1044 self.assertEqual(msg, buf.value.decode())
1045 break
1046 time.sleep(0.1)
1047 count += 1
1048 else:
1049 self.fail("Did not receive communication from the subprocess")
1050
Brian Curtineb24d742010-04-12 17:16:38 +00001051 os.kill(proc.pid, sig)
1052 self.assertEqual(proc.wait(), sig)
1053
1054 def test_kill_sigterm(self):
1055 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001056 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001057
1058 def test_kill_int(self):
1059 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001060 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001061
1062 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001063 tagname = "test_os_%s" % uuid.uuid1()
1064 m = mmap.mmap(-1, 1, tagname)
1065 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001066 # Run a script which has console control handling enabled.
1067 proc = subprocess.Popen([sys.executable,
1068 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001069 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001070 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1071 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001072 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001073 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001074 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001075 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001076 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001077 count += 1
1078 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001079 # Forcefully kill the process if we weren't able to signal it.
1080 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001081 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001082 os.kill(proc.pid, event)
1083 # proc.send_signal(event) could also be done here.
1084 # Allow time for the signal to be passed and the process to exit.
1085 time.sleep(0.5)
1086 if not proc.poll():
1087 # Forcefully kill the process if we weren't able to signal it.
1088 os.kill(proc.pid, signal.SIGINT)
1089 self.fail("subprocess did not stop on {}".format(name))
1090
1091 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1092 def test_CTRL_C_EVENT(self):
1093 from ctypes import wintypes
1094 import ctypes
1095
1096 # Make a NULL value by creating a pointer with no argument.
1097 NULL = ctypes.POINTER(ctypes.c_int)()
1098 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1099 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1100 wintypes.BOOL)
1101 SetConsoleCtrlHandler.restype = wintypes.BOOL
1102
1103 # Calling this with NULL and FALSE causes the calling process to
1104 # handle CTRL+C, rather than ignore it. This property is inherited
1105 # by subprocesses.
1106 SetConsoleCtrlHandler(NULL, 0)
1107
1108 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1109
1110 def test_CTRL_BREAK_EVENT(self):
1111 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1112
1113
Brian Curtind40e6f72010-07-08 21:39:08 +00001114@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001115@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001116class Win32SymlinkTests(unittest.TestCase):
1117 filelink = 'filelinktest'
1118 filelink_target = os.path.abspath(__file__)
1119 dirlink = 'dirlinktest'
1120 dirlink_target = os.path.dirname(filelink_target)
1121 missing_link = 'missing link'
1122
1123 def setUp(self):
1124 assert os.path.exists(self.dirlink_target)
1125 assert os.path.exists(self.filelink_target)
1126 assert not os.path.exists(self.dirlink)
1127 assert not os.path.exists(self.filelink)
1128 assert not os.path.exists(self.missing_link)
1129
1130 def tearDown(self):
1131 if os.path.exists(self.filelink):
1132 os.remove(self.filelink)
1133 if os.path.exists(self.dirlink):
1134 os.rmdir(self.dirlink)
1135 if os.path.lexists(self.missing_link):
1136 os.remove(self.missing_link)
1137
1138 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001139 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001140 self.assertTrue(os.path.exists(self.dirlink))
1141 self.assertTrue(os.path.isdir(self.dirlink))
1142 self.assertTrue(os.path.islink(self.dirlink))
1143 self.check_stat(self.dirlink, self.dirlink_target)
1144
1145 def test_file_link(self):
1146 os.symlink(self.filelink_target, self.filelink)
1147 self.assertTrue(os.path.exists(self.filelink))
1148 self.assertTrue(os.path.isfile(self.filelink))
1149 self.assertTrue(os.path.islink(self.filelink))
1150 self.check_stat(self.filelink, self.filelink_target)
1151
1152 def _create_missing_dir_link(self):
1153 'Create a "directory" link to a non-existent target'
1154 linkname = self.missing_link
1155 if os.path.lexists(linkname):
1156 os.remove(linkname)
1157 target = r'c:\\target does not exist.29r3c740'
1158 assert not os.path.exists(target)
1159 target_is_dir = True
1160 os.symlink(target, linkname, target_is_dir)
1161
1162 def test_remove_directory_link_to_missing_target(self):
1163 self._create_missing_dir_link()
1164 # For compatibility with Unix, os.remove will check the
1165 # directory status and call RemoveDirectory if the symlink
1166 # was created with target_is_dir==True.
1167 os.remove(self.missing_link)
1168
1169 @unittest.skip("currently fails; consider for improvement")
1170 def test_isdir_on_directory_link_to_missing_target(self):
1171 self._create_missing_dir_link()
1172 # consider having isdir return true for directory links
1173 self.assertTrue(os.path.isdir(self.missing_link))
1174
1175 @unittest.skip("currently fails; consider for improvement")
1176 def test_rmdir_on_directory_link_to_missing_target(self):
1177 self._create_missing_dir_link()
1178 # consider allowing rmdir to remove directory links
1179 os.rmdir(self.missing_link)
1180
1181 def check_stat(self, link, target):
1182 self.assertEqual(os.stat(link), os.stat(target))
1183 self.assertNotEqual(os.lstat(link), os.stat(link))
1184
Brian Curtind25aef52011-06-13 15:16:04 -05001185 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001186 with warnings.catch_warnings():
1187 warnings.simplefilter("ignore", DeprecationWarning)
1188 self.assertEqual(os.stat(bytes_link), os.stat(target))
1189 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001190
1191 def test_12084(self):
1192 level1 = os.path.abspath(support.TESTFN)
1193 level2 = os.path.join(level1, "level2")
1194 level3 = os.path.join(level2, "level3")
1195 try:
1196 os.mkdir(level1)
1197 os.mkdir(level2)
1198 os.mkdir(level3)
1199
1200 file1 = os.path.abspath(os.path.join(level1, "file1"))
1201
1202 with open(file1, "w") as f:
1203 f.write("file1")
1204
1205 orig_dir = os.getcwd()
1206 try:
1207 os.chdir(level2)
1208 link = os.path.join(level2, "link")
1209 os.symlink(os.path.relpath(file1), "link")
1210 self.assertIn("link", os.listdir(os.getcwd()))
1211
1212 # Check os.stat calls from the same dir as the link
1213 self.assertEqual(os.stat(file1), os.stat("link"))
1214
1215 # Check os.stat calls from a dir below the link
1216 os.chdir(level1)
1217 self.assertEqual(os.stat(file1),
1218 os.stat(os.path.relpath(link)))
1219
1220 # Check os.stat calls from a dir above the link
1221 os.chdir(level3)
1222 self.assertEqual(os.stat(file1),
1223 os.stat(os.path.relpath(link)))
1224 finally:
1225 os.chdir(orig_dir)
1226 except OSError as err:
1227 self.fail(err)
1228 finally:
1229 os.remove(file1)
1230 shutil.rmtree(level1)
1231
Brian Curtind40e6f72010-07-08 21:39:08 +00001232
Victor Stinnere8d51452010-08-19 01:05:19 +00001233class FSEncodingTests(unittest.TestCase):
1234 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001235 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1236 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001237
Victor Stinnere8d51452010-08-19 01:05:19 +00001238 def test_identity(self):
1239 # assert fsdecode(fsencode(x)) == x
1240 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1241 try:
1242 bytesfn = os.fsencode(fn)
1243 except UnicodeEncodeError:
1244 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001245 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001246
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001247
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001248class PidTests(unittest.TestCase):
1249 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1250 def test_getppid(self):
1251 p = subprocess.Popen([sys.executable, '-c',
1252 'import os; print(os.getppid())'],
1253 stdout=subprocess.PIPE)
1254 stdout, _ = p.communicate()
1255 # We are the parent of our subprocess
1256 self.assertEqual(int(stdout), os.getpid())
1257
1258
Brian Curtin0151b8e2010-09-24 13:43:43 +00001259# The introduction of this TestCase caused at least two different errors on
1260# *nix buildbots. Temporarily skip this to let the buildbots move along.
1261@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001262@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1263class LoginTests(unittest.TestCase):
1264 def test_getlogin(self):
1265 user_name = os.getlogin()
1266 self.assertNotEqual(len(user_name), 0)
1267
1268
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001269@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1270 "needs os.getpriority and os.setpriority")
1271class ProgramPriorityTests(unittest.TestCase):
1272 """Tests for os.getpriority() and os.setpriority()."""
1273
1274 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001275
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001276 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1277 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1278 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001279 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1280 if base >= 19 and new_prio <= 19:
1281 raise unittest.SkipTest(
1282 "unable to reliably test setpriority at current nice level of %s" % base)
1283 else:
1284 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001285 finally:
1286 try:
1287 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1288 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001289 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001290 raise
1291
1292
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001293if threading is not None:
1294 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001295
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001296 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001297
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001298 def __init__(self, conn):
1299 asynchat.async_chat.__init__(self, conn)
1300 self.in_buffer = []
1301 self.closed = False
1302 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001303
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001304 def handle_read(self):
1305 data = self.recv(4096)
1306 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001307
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001308 def get_data(self):
1309 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001310
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001311 def handle_close(self):
1312 self.close()
1313 self.closed = True
1314
1315 def handle_error(self):
1316 raise
1317
1318 def __init__(self, address):
1319 threading.Thread.__init__(self)
1320 asyncore.dispatcher.__init__(self)
1321 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1322 self.bind(address)
1323 self.listen(5)
1324 self.host, self.port = self.socket.getsockname()[:2]
1325 self.handler_instance = None
1326 self._active = False
1327 self._active_lock = threading.Lock()
1328
1329 # --- public API
1330
1331 @property
1332 def running(self):
1333 return self._active
1334
1335 def start(self):
1336 assert not self.running
1337 self.__flag = threading.Event()
1338 threading.Thread.start(self)
1339 self.__flag.wait()
1340
1341 def stop(self):
1342 assert self.running
1343 self._active = False
1344 self.join()
1345
1346 def wait(self):
1347 # wait for handler connection to be closed, then stop the server
1348 while not getattr(self.handler_instance, "closed", False):
1349 time.sleep(0.001)
1350 self.stop()
1351
1352 # --- internals
1353
1354 def run(self):
1355 self._active = True
1356 self.__flag.set()
1357 while self._active and asyncore.socket_map:
1358 self._active_lock.acquire()
1359 asyncore.loop(timeout=0.001, count=1)
1360 self._active_lock.release()
1361 asyncore.close_all()
1362
1363 def handle_accept(self):
1364 conn, addr = self.accept()
1365 self.handler_instance = self.Handler(conn)
1366
1367 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001368 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001369 handle_read = handle_connect
1370
1371 def writable(self):
1372 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001373
1374 def handle_error(self):
1375 raise
1376
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001377
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001378@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001379@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1380class TestSendfile(unittest.TestCase):
1381
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001382 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001383 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001384 not sys.platform.startswith("solaris") and \
1385 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001386
1387 @classmethod
1388 def setUpClass(cls):
1389 with open(support.TESTFN, "wb") as f:
1390 f.write(cls.DATA)
1391
1392 @classmethod
1393 def tearDownClass(cls):
1394 support.unlink(support.TESTFN)
1395
1396 def setUp(self):
1397 self.server = SendfileTestServer((support.HOST, 0))
1398 self.server.start()
1399 self.client = socket.socket()
1400 self.client.connect((self.server.host, self.server.port))
1401 self.client.settimeout(1)
1402 # synchronize by waiting for "220 ready" response
1403 self.client.recv(1024)
1404 self.sockno = self.client.fileno()
1405 self.file = open(support.TESTFN, 'rb')
1406 self.fileno = self.file.fileno()
1407
1408 def tearDown(self):
1409 self.file.close()
1410 self.client.close()
1411 if self.server.running:
1412 self.server.stop()
1413
1414 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1415 """A higher level wrapper representing how an application is
1416 supposed to use sendfile().
1417 """
1418 while 1:
1419 try:
1420 if self.SUPPORT_HEADERS_TRAILERS:
1421 return os.sendfile(sock, file, offset, nbytes, headers,
1422 trailers)
1423 else:
1424 return os.sendfile(sock, file, offset, nbytes)
1425 except OSError as err:
1426 if err.errno == errno.ECONNRESET:
1427 # disconnected
1428 raise
1429 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1430 # we have to retry send data
1431 continue
1432 else:
1433 raise
1434
1435 def test_send_whole_file(self):
1436 # normal send
1437 total_sent = 0
1438 offset = 0
1439 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001440 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001441 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1442 if sent == 0:
1443 break
1444 offset += sent
1445 total_sent += sent
1446 self.assertTrue(sent <= nbytes)
1447 self.assertEqual(offset, total_sent)
1448
1449 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001450 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001451 self.client.close()
1452 self.server.wait()
1453 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001454 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001455 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001456
1457 def test_send_at_certain_offset(self):
1458 # start sending a file at a certain offset
1459 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001460 offset = len(self.DATA) // 2
1461 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001462 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001463 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001464 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1465 if sent == 0:
1466 break
1467 offset += sent
1468 total_sent += sent
1469 self.assertTrue(sent <= nbytes)
1470
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001471 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001472 self.client.close()
1473 self.server.wait()
1474 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001475 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001476 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001477 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001478 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001479
1480 def test_offset_overflow(self):
1481 # specify an offset > file size
1482 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001483 try:
1484 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1485 except OSError as e:
1486 # Solaris can raise EINVAL if offset >= file length, ignore.
1487 if e.errno != errno.EINVAL:
1488 raise
1489 else:
1490 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001491 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001492 self.client.close()
1493 self.server.wait()
1494 data = self.server.handler_instance.get_data()
1495 self.assertEqual(data, b'')
1496
1497 def test_invalid_offset(self):
1498 with self.assertRaises(OSError) as cm:
1499 os.sendfile(self.sockno, self.fileno, -1, 4096)
1500 self.assertEqual(cm.exception.errno, errno.EINVAL)
1501
1502 # --- headers / trailers tests
1503
1504 if SUPPORT_HEADERS_TRAILERS:
1505
1506 def test_headers(self):
1507 total_sent = 0
1508 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1509 headers=[b"x" * 512])
1510 total_sent += sent
1511 offset = 4096
1512 nbytes = 4096
1513 while 1:
1514 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1515 offset, nbytes)
1516 if sent == 0:
1517 break
1518 total_sent += sent
1519 offset += sent
1520
1521 expected_data = b"x" * 512 + self.DATA
1522 self.assertEqual(total_sent, len(expected_data))
1523 self.client.close()
1524 self.server.wait()
1525 data = self.server.handler_instance.get_data()
1526 self.assertEqual(hash(data), hash(expected_data))
1527
1528 def test_trailers(self):
1529 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001530 with open(TESTFN2, 'wb') as f:
1531 f.write(b"abcde")
1532 with open(TESTFN2, 'rb')as f:
1533 self.addCleanup(os.remove, TESTFN2)
1534 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1535 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001536 self.client.close()
1537 self.server.wait()
1538 data = self.server.handler_instance.get_data()
1539 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001540
1541 if hasattr(os, "SF_NODISKIO"):
1542 def test_flags(self):
1543 try:
1544 os.sendfile(self.sockno, self.fileno, 0, 4096,
1545 flags=os.SF_NODISKIO)
1546 except OSError as err:
1547 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1548 raise
1549
1550
Benjamin Peterson799bd802011-08-31 22:15:17 -04001551def supports_extended_attributes():
1552 if not hasattr(os, "setxattr"):
1553 return False
1554 try:
1555 with open(support.TESTFN, "wb") as fp:
1556 try:
1557 os.fsetxattr(fp.fileno(), b"user.test", b"")
1558 except OSError as e:
1559 if e.errno != errno.ENOTSUP:
1560 raise
1561 return False
1562 finally:
1563 support.unlink(support.TESTFN)
1564 # Kernels < 2.6.39 don't respect setxattr flags.
1565 kernel_version = platform.release()
1566 m = re.match("2.6.(\d{1,2})", kernel_version)
1567 return m is None or int(m.group(1)) >= 39
1568
1569
1570@unittest.skipUnless(supports_extended_attributes(),
1571 "no non-broken extended attribute support")
1572class ExtendedAttributeTests(unittest.TestCase):
1573
1574 def tearDown(self):
1575 support.unlink(support.TESTFN)
1576
1577 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1578 fn = support.TESTFN
1579 open(fn, "wb").close()
1580 with self.assertRaises(OSError) as cm:
1581 getxattr(fn, s("user.test"))
1582 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001583 init_xattr = listxattr(fn)
1584 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001585 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001586 xattr = set(init_xattr)
1587 xattr.add("user.test")
1588 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001589 self.assertEqual(getxattr(fn, b"user.test"), b"")
1590 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1591 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1592 with self.assertRaises(OSError) as cm:
1593 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1594 self.assertEqual(cm.exception.errno, errno.EEXIST)
1595 with self.assertRaises(OSError) as cm:
1596 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1597 self.assertEqual(cm.exception.errno, errno.ENODATA)
1598 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001599 xattr.add("user.test2")
1600 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001601 removexattr(fn, s("user.test"))
1602 with self.assertRaises(OSError) as cm:
1603 getxattr(fn, s("user.test"))
1604 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001605 xattr.remove("user.test")
1606 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001607 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1608 setxattr(fn, s("user.test"), b"a"*1024)
1609 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1610 removexattr(fn, s("user.test"))
1611 many = sorted("user.test{}".format(i) for i in range(100))
1612 for thing in many:
1613 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001614 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001615
1616 def _check_xattrs(self, *args):
1617 def make_bytes(s):
1618 return bytes(s, "ascii")
1619 self._check_xattrs_str(str, *args)
1620 support.unlink(support.TESTFN)
1621 self._check_xattrs_str(make_bytes, *args)
1622
1623 def test_simple(self):
1624 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1625 os.listxattr)
1626
1627 def test_lpath(self):
1628 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1629 os.llistxattr)
1630
1631 def test_fds(self):
1632 def getxattr(path, *args):
1633 with open(path, "rb") as fp:
1634 return os.fgetxattr(fp.fileno(), *args)
1635 def setxattr(path, *args):
1636 with open(path, "wb") as fp:
1637 os.fsetxattr(fp.fileno(), *args)
1638 def removexattr(path, *args):
1639 with open(path, "wb") as fp:
1640 os.fremovexattr(fp.fileno(), *args)
1641 def listxattr(path, *args):
1642 with open(path, "rb") as fp:
1643 return os.flistxattr(fp.fileno(), *args)
1644 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1645
1646
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001647@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1648class Win32DeprecatedBytesAPI(unittest.TestCase):
1649 def test_deprecated(self):
1650 import nt
1651 filename = os.fsencode(support.TESTFN)
1652 with warnings.catch_warnings():
1653 warnings.simplefilter("error", DeprecationWarning)
1654 for func, *args in (
1655 (nt._getfullpathname, filename),
1656 (nt._isdir, filename),
1657 (os.access, filename, os.R_OK),
1658 (os.chdir, filename),
1659 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001660 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001661 (os.link, filename, filename),
1662 (os.listdir, filename),
1663 (os.lstat, filename),
1664 (os.mkdir, filename),
1665 (os.open, filename, os.O_RDONLY),
1666 (os.rename, filename, filename),
1667 (os.rmdir, filename),
1668 (os.startfile, filename),
1669 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001670 (os.unlink, filename),
1671 (os.utime, filename),
1672 ):
1673 self.assertRaises(DeprecationWarning, func, *args)
1674
Victor Stinner28216442011-11-16 00:34:44 +01001675 @support.skip_unless_symlink
1676 def test_symlink(self):
1677 filename = os.fsencode(support.TESTFN)
1678 with warnings.catch_warnings():
1679 warnings.simplefilter("error", DeprecationWarning)
1680 self.assertRaises(DeprecationWarning,
1681 os.symlink, filename, filename)
1682
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001683
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001684@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001685def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001686 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001687 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001688 StatAttributeTests,
1689 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001690 WalkTests,
1691 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001692 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001693 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001694 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001695 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001696 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001697 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001698 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001699 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001700 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001701 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001702 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001703 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001704 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001705 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001706 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001707 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001708 Win32DeprecatedBytesAPI,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001709 )
Fred Drake2e2be372001-09-20 21:33:42 +00001710
1711if __name__ == "__main__":
1712 test_main()