blob: 79e66fb05612da911638a4f3ac12a7de39cdf2d9 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
23try:
24 import threading
25except ImportError:
26 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000027
Mark Dickinson7cf03892010-04-16 13:45:35 +000028# Detect whether we're on a Linux system that uses the (now outdated
29# and unmaintained) linuxthreads threading library. There's an issue
30# when combining linuxthreads with a failed execv call: see
31# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020032if hasattr(sys, 'thread_info') and sys.thread_info.version:
33 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
34else:
35 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000036
Thomas Wouters0e3f5912006-08-11 14:57:12 +000037# Tests creating TESTFN
38class FileTests(unittest.TestCase):
39 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040 if os.path.exists(support.TESTFN):
41 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000042 tearDown = setUp
43
44 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000045 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000047 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048
Christian Heimesfdab48e2008-01-20 09:06:41 +000049 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000050 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
51 # We must allocate two consecutive file descriptors, otherwise
52 # it will mess up other file descriptors (perhaps even the three
53 # standard ones).
54 second = os.dup(first)
55 try:
56 retries = 0
57 while second != first + 1:
58 os.close(first)
59 retries += 1
60 if retries > 10:
61 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000062 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first, second = second, os.dup(second)
64 finally:
65 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000066 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000067 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000068 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000070 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000071 def test_rename(self):
72 path = support.TESTFN
73 old = sys.getrefcount(path)
74 self.assertRaises(TypeError, os.rename, path, 0)
75 new = sys.getrefcount(path)
76 self.assertEqual(old, new)
77
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000078 def test_read(self):
79 with open(support.TESTFN, "w+b") as fobj:
80 fobj.write(b"spam")
81 fobj.flush()
82 fd = fobj.fileno()
83 os.lseek(fd, 0, 0)
84 s = os.read(fd, 4)
85 self.assertEqual(type(s), bytes)
86 self.assertEqual(s, b"spam")
87
88 def test_write(self):
89 # os.write() accepts bytes- and buffer-like objects but not strings
90 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
91 self.assertRaises(TypeError, os.write, fd, "beans")
92 os.write(fd, b"bacon\n")
93 os.write(fd, bytearray(b"eggs\n"))
94 os.write(fd, memoryview(b"spam\n"))
95 os.close(fd)
96 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000097 self.assertEqual(fobj.read().splitlines(),
98 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099
Victor Stinnere0daff12011-03-20 23:36:35 +0100100 def write_windows_console(self, *args):
101 retcode = subprocess.call(args,
102 # use a new console to not flood the test output
103 creationflags=subprocess.CREATE_NEW_CONSOLE,
104 # use a shell to hide the console window (SW_HIDE)
105 shell=True)
106 self.assertEqual(retcode, 0)
107
108 @unittest.skipUnless(sys.platform == 'win32',
109 'test specific to the Windows console')
110 def test_write_windows_console(self):
111 # Issue #11395: the Windows console returns an error (12: not enough
112 # space error) on writing into stdout if stdout mode is binary and the
113 # length is greater than 66,000 bytes (or less, depending on heap
114 # usage).
115 code = "print('x' * 100000)"
116 self.write_windows_console(sys.executable, "-c", code)
117 self.write_windows_console(sys.executable, "-u", "-c", code)
118
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000119 def fdopen_helper(self, *args):
120 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200121 f = os.fdopen(fd, *args)
122 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000123
124 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200125 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
126 os.close(fd)
127
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000128 self.fdopen_helper()
129 self.fdopen_helper('r')
130 self.fdopen_helper('r', 100)
131
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200132
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000133# Test attributes on return values from os.*stat* family.
134class StatAttributeTests(unittest.TestCase):
135 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000136 os.mkdir(support.TESTFN)
137 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000138 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000139 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000140 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000141
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000142 def tearDown(self):
143 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000144 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000145
Antoine Pitrou38425292010-09-21 18:19:07 +0000146 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000147 if not hasattr(os, "stat"):
148 return
149
150 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000151 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000152
153 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000154 self.assertEqual(result[stat.ST_SIZE], 3)
155 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000156
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000157 # Make sure all the attributes are there
158 members = dir(result)
159 for name in dir(stat):
160 if name[:3] == 'ST_':
161 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000162 if name.endswith("TIME"):
163 def trunc(x): return int(x)
164 else:
165 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000166 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000168 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169
170 try:
171 result[200]
172 self.fail("No exception thrown")
173 except IndexError:
174 pass
175
176 # Make sure that assignment fails
177 try:
178 result.st_mode = 1
179 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000180 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 pass
182
183 try:
184 result.st_rdev = 1
185 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000186 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000187 pass
188
189 try:
190 result.parrot = 1
191 self.fail("No exception thrown")
192 except AttributeError:
193 pass
194
195 # Use the stat_result constructor with a too-short tuple.
196 try:
197 result2 = os.stat_result((10,))
198 self.fail("No exception thrown")
199 except TypeError:
200 pass
201
Ezio Melotti42da6632011-03-15 05:18:48 +0200202 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203 try:
204 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
205 except TypeError:
206 pass
207
Antoine Pitrou38425292010-09-21 18:19:07 +0000208 def test_stat_attributes(self):
209 self.check_stat_attributes(self.fname)
210
211 def test_stat_attributes_bytes(self):
212 try:
213 fname = self.fname.encode(sys.getfilesystemencoding())
214 except UnicodeEncodeError:
215 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100216 with warnings.catch_warnings():
217 warnings.simplefilter("ignore", DeprecationWarning)
218 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000219
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 def test_statvfs_attributes(self):
221 if not hasattr(os, "statvfs"):
222 return
223
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000224 try:
225 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000226 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000227 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000228 if e.errno == errno.ENOSYS:
229 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000230
231 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000232 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000234 # Make sure all the attributes are there.
235 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
236 'ffree', 'favail', 'flag', 'namemax')
237 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000238 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000239
240 # Make sure that assignment really fails
241 try:
242 result.f_bfree = 1
243 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000244 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000245 pass
246
247 try:
248 result.parrot = 1
249 self.fail("No exception thrown")
250 except AttributeError:
251 pass
252
253 # Use the constructor with a too-short tuple.
254 try:
255 result2 = os.statvfs_result((10,))
256 self.fail("No exception thrown")
257 except TypeError:
258 pass
259
Ezio Melotti42da6632011-03-15 05:18:48 +0200260 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261 try:
262 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
263 except TypeError:
264 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000265
Thomas Wouters89f507f2006-12-13 04:49:30 +0000266 def test_utime_dir(self):
267 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000268 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000269 # round to int, because some systems may support sub-second
270 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
272 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000273 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000274
Brian Curtin52fbea12011-11-06 13:41:17 -0600275 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600276 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600277 # second argument. Check that the previous methods of passing
278 # a time tuple or None work in addition to no argument.
279 st = os.stat(support.TESTFN)
280 # Doesn't set anything new, but sets the time tuple way
281 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
282 # Set to the current time in the old explicit way.
283 os.utime(support.TESTFN, None)
284 st1 = os.stat(support.TESTFN)
285 # Set to the current time in the new way
286 os.utime(support.TESTFN)
287 st2 = os.stat(support.TESTFN)
288 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
289
Thomas Wouters89f507f2006-12-13 04:49:30 +0000290 # Restrict test to Win32, since there is no guarantee other
291 # systems support centiseconds
292 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000293 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000294 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000295 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000296 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000297 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000298 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000299 return buf.value
300
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000301 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000302 def test_1565150(self):
303 t1 = 1159195039.25
304 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000305 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000306
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000307 def test_large_time(self):
308 t1 = 5000000000 # some day in 2128
309 os.utime(self.fname, (t1, t1))
310 self.assertEqual(os.stat(self.fname).st_mtime, t1)
311
Guido van Rossumd8faa362007-04-27 19:54:29 +0000312 def test_1686475(self):
313 # Verify that an open file can be stat'ed
314 try:
315 os.stat(r"c:\pagefile.sys")
316 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000317 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000318 return
319 self.fail("Could not stat pagefile.sys")
320
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000321from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000322
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000323class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000324 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000325 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000326
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000327 def setUp(self):
328 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000329 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000330 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000331 for key, value in self._reference().items():
332 os.environ[key] = value
333
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000334 def tearDown(self):
335 os.environ.clear()
336 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000337 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000338 os.environb.clear()
339 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000340
Christian Heimes90333392007-11-01 19:08:42 +0000341 def _reference(self):
342 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
343
344 def _empty_mapping(self):
345 os.environ.clear()
346 return os.environ
347
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000348 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000349 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000350 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000351 if os.path.exists("/bin/sh"):
352 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000353 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
354 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000355 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000356
Christian Heimes1a13d592007-11-08 14:16:55 +0000357 def test_os_popen_iter(self):
358 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000359 with os.popen(
360 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
361 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000362 self.assertEqual(next(it), "line1\n")
363 self.assertEqual(next(it), "line2\n")
364 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000365 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000366
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000367 # Verify environ keys and values from the OS are of the
368 # correct str type.
369 def test_keyvalue_types(self):
370 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000371 self.assertEqual(type(key), str)
372 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000373
Christian Heimes90333392007-11-01 19:08:42 +0000374 def test_items(self):
375 for key, value in self._reference().items():
376 self.assertEqual(os.environ.get(key), value)
377
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000378 # Issue 7310
379 def test___repr__(self):
380 """Check that the repr() of os.environ looks like environ({...})."""
381 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000382 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
383 '{!r}: {!r}'.format(key, value)
384 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000385
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000386 def test_get_exec_path(self):
387 defpath_list = os.defpath.split(os.pathsep)
388 test_path = ['/monty', '/python', '', '/flying/circus']
389 test_env = {'PATH': os.pathsep.join(test_path)}
390
391 saved_environ = os.environ
392 try:
393 os.environ = dict(test_env)
394 # Test that defaulting to os.environ works.
395 self.assertSequenceEqual(test_path, os.get_exec_path())
396 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
397 finally:
398 os.environ = saved_environ
399
400 # No PATH environment variable
401 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
402 # Empty PATH environment variable
403 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
404 # Supplied PATH environment variable
405 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
406
Victor Stinnerb745a742010-05-18 17:17:23 +0000407 if os.supports_bytes_environ:
408 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000409 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000410 # ignore BytesWarning warning
411 with warnings.catch_warnings(record=True):
412 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000413 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000414 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000415 pass
416 else:
417 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000418
419 # bytes key and/or value
420 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
421 ['abc'])
422 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
423 ['abc'])
424 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
425 ['abc'])
426
427 @unittest.skipUnless(os.supports_bytes_environ,
428 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000429 def test_environb(self):
430 # os.environ -> os.environb
431 value = 'euro\u20ac'
432 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000433 value_bytes = value.encode(sys.getfilesystemencoding(),
434 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000435 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000436 msg = "U+20AC character is not encodable to %s" % (
437 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000438 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000439 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000440 self.assertEqual(os.environ['unicode'], value)
441 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000442
443 # os.environb -> os.environ
444 value = b'\xff'
445 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000446 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000447 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000448 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000449
Charles-François Natali2966f102011-11-26 11:32:46 +0100450 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
451 # #13415).
452 @support.requires_freebsd_version(7)
453 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100454 def test_unset_error(self):
455 if sys.platform == "win32":
456 # an environment variable is limited to 32,767 characters
457 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100458 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100459 else:
460 # "=" is not allowed in a variable name
461 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100462 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100463
Tim Petersc4e09402003-04-25 07:11:48 +0000464class WalkTests(unittest.TestCase):
465 """Tests for os.walk()."""
466
467 def test_traversal(self):
468 import os
469 from os.path import join
470
471 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000472 # TESTFN/
473 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000474 # tmp1
475 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000476 # tmp2
477 # SUB11/ no kids
478 # SUB2/ a file kid and a dirsymlink kid
479 # tmp3
480 # link/ a symlink to TESTFN.2
481 # TEST2/
482 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000483 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000484 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000485 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000486 sub2_path = join(walk_path, "SUB2")
487 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000488 tmp2_path = join(sub1_path, "tmp2")
489 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000490 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000491 t2_path = join(support.TESTFN, "TEST2")
492 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000493
494 # Create stuff.
495 os.makedirs(sub11_path)
496 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000497 os.makedirs(t2_path)
498 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000499 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000500 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
501 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000502 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000503 os.symlink(os.path.abspath(t2_path), link_path)
504 sub2_tree = (sub2_path, ["link"], ["tmp3"])
505 else:
506 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000507
508 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000509 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000510 self.assertEqual(len(all), 4)
511 # We can't know which order SUB1 and SUB2 will appear in.
512 # Not flipped: TESTFN, SUB1, SUB11, SUB2
513 # flipped: TESTFN, SUB2, SUB1, SUB11
514 flipped = all[0][1][0] != "SUB1"
515 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000516 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000517 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
518 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000519 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000520
521 # Prune the search.
522 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000523 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000524 all.append((root, dirs, files))
525 # Don't descend into SUB1.
526 if 'SUB1' in dirs:
527 # Note that this also mutates the dirs we appended to all!
528 dirs.remove('SUB1')
529 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000530 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
531 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000532
533 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000534 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000535 self.assertEqual(len(all), 4)
536 # We can't know which order SUB1 and SUB2 will appear in.
537 # Not flipped: SUB11, SUB1, SUB2, TESTFN
538 # flipped: SUB2, SUB11, SUB1, TESTFN
539 flipped = all[3][1][0] != "SUB1"
540 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000541 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000542 self.assertEqual(all[flipped], (sub11_path, [], []))
543 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000544 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000545
Brian Curtin3b4499c2010-12-28 14:31:47 +0000546 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000547 # Walk, following symlinks.
548 for root, dirs, files in os.walk(walk_path, followlinks=True):
549 if root == link_path:
550 self.assertEqual(dirs, [])
551 self.assertEqual(files, ["tmp4"])
552 break
553 else:
554 self.fail("Didn't follow symlink with followlinks=True")
555
556 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000557 # Tear everything down. This is a decent use for bottom-up on
558 # Windows, which doesn't have a recursive delete command. The
559 # (not so) subtlety is that rmdir will fail unless the dir's
560 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000561 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000562 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000563 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000564 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000565 dirname = os.path.join(root, name)
566 if not os.path.islink(dirname):
567 os.rmdir(dirname)
568 else:
569 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000570 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000571
Guido van Rossume7ba4952007-06-06 23:52:48 +0000572class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000573 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000574 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000575
576 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000577 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000578 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
579 os.makedirs(path) # Should work
580 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
581 os.makedirs(path)
582
583 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000584 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000585 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
586 os.makedirs(path)
587 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
588 'dir5', 'dir6')
589 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000590
Terry Reedy5a22b652010-12-02 07:05:56 +0000591 def test_exist_ok_existing_directory(self):
592 path = os.path.join(support.TESTFN, 'dir1')
593 mode = 0o777
594 old_mask = os.umask(0o022)
595 os.makedirs(path, mode)
596 self.assertRaises(OSError, os.makedirs, path, mode)
597 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
598 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
599 os.makedirs(path, mode=mode, exist_ok=True)
600 os.umask(old_mask)
601
602 def test_exist_ok_existing_regular_file(self):
603 base = support.TESTFN
604 path = os.path.join(support.TESTFN, 'dir1')
605 f = open(path, 'w')
606 f.write('abc')
607 f.close()
608 self.assertRaises(OSError, os.makedirs, path)
609 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
610 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
611 os.remove(path)
612
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000613 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000614 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000615 'dir4', 'dir5', 'dir6')
616 # If the tests failed, the bottom-most directory ('../dir6')
617 # may not have been created, so we look for the outermost directory
618 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000619 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000620 path = os.path.dirname(path)
621
622 os.removedirs(path)
623
Guido van Rossume7ba4952007-06-06 23:52:48 +0000624class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000625 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200626 with open(os.devnull, 'wb') as f:
627 f.write(b'hello')
628 f.close()
629 with open(os.devnull, 'rb') as f:
630 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000631
Guido van Rossume7ba4952007-06-06 23:52:48 +0000632class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000633 def test_urandom(self):
634 try:
635 self.assertEqual(len(os.urandom(1)), 1)
636 self.assertEqual(len(os.urandom(10)), 10)
637 self.assertEqual(len(os.urandom(100)), 100)
638 self.assertEqual(len(os.urandom(1000)), 1000)
639 except NotImplementedError:
640 pass
641
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000642@contextlib.contextmanager
643def _execvpe_mockup(defpath=None):
644 """
645 Stubs out execv and execve functions when used as context manager.
646 Records exec calls. The mock execv and execve functions always raise an
647 exception as they would normally never return.
648 """
649 # A list of tuples containing (function name, first arg, args)
650 # of calls to execv or execve that have been made.
651 calls = []
652
653 def mock_execv(name, *args):
654 calls.append(('execv', name, args))
655 raise RuntimeError("execv called")
656
657 def mock_execve(name, *args):
658 calls.append(('execve', name, args))
659 raise OSError(errno.ENOTDIR, "execve called")
660
661 try:
662 orig_execv = os.execv
663 orig_execve = os.execve
664 orig_defpath = os.defpath
665 os.execv = mock_execv
666 os.execve = mock_execve
667 if defpath is not None:
668 os.defpath = defpath
669 yield calls
670 finally:
671 os.execv = orig_execv
672 os.execve = orig_execve
673 os.defpath = orig_defpath
674
Guido van Rossume7ba4952007-06-06 23:52:48 +0000675class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000676 @unittest.skipIf(USING_LINUXTHREADS,
677 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000678 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000679 self.assertRaises(OSError, os.execvpe, 'no such app-',
680 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000681
Thomas Heller6790d602007-08-30 17:15:14 +0000682 def test_execvpe_with_bad_arglist(self):
683 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
684
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000685 @unittest.skipUnless(hasattr(os, '_execvpe'),
686 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000687 def _test_internal_execvpe(self, test_type):
688 program_path = os.sep + 'absolutepath'
689 if test_type is bytes:
690 program = b'executable'
691 fullpath = os.path.join(os.fsencode(program_path), program)
692 native_fullpath = fullpath
693 arguments = [b'progname', 'arg1', 'arg2']
694 else:
695 program = 'executable'
696 arguments = ['progname', 'arg1', 'arg2']
697 fullpath = os.path.join(program_path, program)
698 if os.name != "nt":
699 native_fullpath = os.fsencode(fullpath)
700 else:
701 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000702 env = {'spam': 'beans'}
703
Victor Stinnerb745a742010-05-18 17:17:23 +0000704 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000705 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000706 self.assertRaises(RuntimeError,
707 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000708 self.assertEqual(len(calls), 1)
709 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
710
Victor Stinnerb745a742010-05-18 17:17:23 +0000711 # test os._execvpe() with a relative path:
712 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000713 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000714 self.assertRaises(OSError,
715 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000716 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000717 self.assertSequenceEqual(calls[0],
718 ('execve', native_fullpath, (arguments, env)))
719
720 # test os._execvpe() with a relative path:
721 # os.get_exec_path() reads the 'PATH' variable
722 with _execvpe_mockup() as calls:
723 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000724 if test_type is bytes:
725 env_path[b'PATH'] = program_path
726 else:
727 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000728 self.assertRaises(OSError,
729 os._execvpe, program, arguments, env=env_path)
730 self.assertEqual(len(calls), 1)
731 self.assertSequenceEqual(calls[0],
732 ('execve', native_fullpath, (arguments, env_path)))
733
734 def test_internal_execvpe_str(self):
735 self._test_internal_execvpe(str)
736 if os.name != "nt":
737 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000738
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000739
Thomas Wouters477c8d52006-05-27 19:21:47 +0000740class Win32ErrorTests(unittest.TestCase):
741 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000742 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000743
744 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000745 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000746
747 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000748 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000749
750 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000751 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000752 try:
753 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
754 finally:
755 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000756 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000757
758 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000759 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000760
Thomas Wouters477c8d52006-05-27 19:21:47 +0000761 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000762 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000763
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000764class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000765 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000766 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
767 #singles.append("close")
768 #We omit close because it doesn'r raise an exception on some platforms
769 def get_single(f):
770 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000771 if hasattr(os, f):
772 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000773 return helper
774 for f in singles:
775 locals()["test_"+f] = get_single(f)
776
Benjamin Peterson7522c742009-01-19 21:00:09 +0000777 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000778 try:
779 f(support.make_bad_fd(), *args)
780 except OSError as e:
781 self.assertEqual(e.errno, errno.EBADF)
782 else:
783 self.fail("%r didn't raise a OSError with a bad file descriptor"
784 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000785
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000786 def test_isatty(self):
787 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000788 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000789
790 def test_closerange(self):
791 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000792 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000793 # Make sure none of the descriptors we are about to close are
794 # currently valid (issue 6542).
795 for i in range(10):
796 try: os.fstat(fd+i)
797 except OSError:
798 pass
799 else:
800 break
801 if i < 2:
802 raise unittest.SkipTest(
803 "Unable to acquire a range of invalid file descriptors")
804 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000805
806 def test_dup2(self):
807 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000808 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000809
810 def test_fchmod(self):
811 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000812 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000813
814 def test_fchown(self):
815 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000816 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000817
818 def test_fpathconf(self):
819 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000820 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000821
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000822 def test_ftruncate(self):
823 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000824 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000825
826 def test_lseek(self):
827 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000828 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000829
830 def test_read(self):
831 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000832 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000833
834 def test_tcsetpgrpt(self):
835 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000836 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000837
838 def test_write(self):
839 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000840 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000841
Brian Curtin1b9df392010-11-24 20:24:31 +0000842
843class LinkTests(unittest.TestCase):
844 def setUp(self):
845 self.file1 = support.TESTFN
846 self.file2 = os.path.join(support.TESTFN + "2")
847
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000848 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000849 for file in (self.file1, self.file2):
850 if os.path.exists(file):
851 os.unlink(file)
852
Brian Curtin1b9df392010-11-24 20:24:31 +0000853 def _test_link(self, file1, file2):
854 with open(file1, "w") as f1:
855 f1.write("test")
856
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100857 with warnings.catch_warnings():
858 warnings.simplefilter("ignore", DeprecationWarning)
859 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +0000860 with open(file1, "r") as f1, open(file2, "r") as f2:
861 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
862
863 def test_link(self):
864 self._test_link(self.file1, self.file2)
865
866 def test_link_bytes(self):
867 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
868 bytes(self.file2, sys.getfilesystemencoding()))
869
Brian Curtinf498b752010-11-30 15:54:04 +0000870 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000871 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000872 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000873 except UnicodeError:
874 raise unittest.SkipTest("Unable to encode for this platform.")
875
Brian Curtinf498b752010-11-30 15:54:04 +0000876 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000877 self.file2 = self.file1 + "2"
878 self._test_link(self.file1, self.file2)
879
Thomas Wouters477c8d52006-05-27 19:21:47 +0000880if sys.platform != 'win32':
881 class Win32ErrorTests(unittest.TestCase):
882 pass
883
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000884 class PosixUidGidTests(unittest.TestCase):
885 if hasattr(os, 'setuid'):
886 def test_setuid(self):
887 if os.getuid() != 0:
888 self.assertRaises(os.error, os.setuid, 0)
889 self.assertRaises(OverflowError, os.setuid, 1<<32)
890
891 if hasattr(os, 'setgid'):
892 def test_setgid(self):
893 if os.getuid() != 0:
894 self.assertRaises(os.error, os.setgid, 0)
895 self.assertRaises(OverflowError, os.setgid, 1<<32)
896
897 if hasattr(os, 'seteuid'):
898 def test_seteuid(self):
899 if os.getuid() != 0:
900 self.assertRaises(os.error, os.seteuid, 0)
901 self.assertRaises(OverflowError, os.seteuid, 1<<32)
902
903 if hasattr(os, 'setegid'):
904 def test_setegid(self):
905 if os.getuid() != 0:
906 self.assertRaises(os.error, os.setegid, 0)
907 self.assertRaises(OverflowError, os.setegid, 1<<32)
908
909 if hasattr(os, 'setreuid'):
910 def test_setreuid(self):
911 if os.getuid() != 0:
912 self.assertRaises(os.error, os.setreuid, 0, 0)
913 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
914 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000915
916 def test_setreuid_neg1(self):
917 # Needs to accept -1. We run this in a subprocess to avoid
918 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000919 subprocess.check_call([
920 sys.executable, '-c',
921 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000922
923 if hasattr(os, 'setregid'):
924 def test_setregid(self):
925 if os.getuid() != 0:
926 self.assertRaises(os.error, os.setregid, 0, 0)
927 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
928 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000929
930 def test_setregid_neg1(self):
931 # Needs to accept -1. We run this in a subprocess to avoid
932 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000933 subprocess.check_call([
934 sys.executable, '-c',
935 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +0000936
937 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +0000938 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000939 if support.TESTFN_UNENCODABLE:
940 self.dir = support.TESTFN_UNENCODABLE
941 else:
942 self.dir = support.TESTFN
943 self.bdir = os.fsencode(self.dir)
944
945 bytesfn = []
946 def add_filename(fn):
947 try:
948 fn = os.fsencode(fn)
949 except UnicodeEncodeError:
950 return
951 bytesfn.append(fn)
952 add_filename(support.TESTFN_UNICODE)
953 if support.TESTFN_UNENCODABLE:
954 add_filename(support.TESTFN_UNENCODABLE)
955 if not bytesfn:
956 self.skipTest("couldn't create any non-ascii filename")
957
958 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +0000959 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000960 try:
961 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +0200962 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +0000963 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000964 if fn in self.unicodefn:
965 raise ValueError("duplicate filename")
966 self.unicodefn.add(fn)
967 except:
968 shutil.rmtree(self.dir)
969 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +0000970
971 def tearDown(self):
972 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000973
974 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000975 expected = self.unicodefn
976 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000977 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000978
979 def test_open(self):
980 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +0200981 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +0000982 f.close()
983
984 def test_stat(self):
985 for fn in self.unicodefn:
986 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000987else:
988 class PosixUidGidTests(unittest.TestCase):
989 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +0000990 class Pep383Tests(unittest.TestCase):
991 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000992
Brian Curtineb24d742010-04-12 17:16:38 +0000993@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
994class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +0000995 def _kill(self, sig):
996 # Start sys.executable as a subprocess and communicate from the
997 # subprocess to the parent that the interpreter is ready. When it
998 # becomes ready, send *sig* via os.kill to the subprocess and check
999 # that the return code is equal to *sig*.
1000 import ctypes
1001 from ctypes import wintypes
1002 import msvcrt
1003
1004 # Since we can't access the contents of the process' stdout until the
1005 # process has exited, use PeekNamedPipe to see what's inside stdout
1006 # without waiting. This is done so we can tell that the interpreter
1007 # is started and running at a point where it could handle a signal.
1008 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1009 PeekNamedPipe.restype = wintypes.BOOL
1010 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1011 ctypes.POINTER(ctypes.c_char), # stdout buf
1012 wintypes.DWORD, # Buffer size
1013 ctypes.POINTER(wintypes.DWORD), # bytes read
1014 ctypes.POINTER(wintypes.DWORD), # bytes avail
1015 ctypes.POINTER(wintypes.DWORD)) # bytes left
1016 msg = "running"
1017 proc = subprocess.Popen([sys.executable, "-c",
1018 "import sys;"
1019 "sys.stdout.write('{}');"
1020 "sys.stdout.flush();"
1021 "input()".format(msg)],
1022 stdout=subprocess.PIPE,
1023 stderr=subprocess.PIPE,
1024 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001025 self.addCleanup(proc.stdout.close)
1026 self.addCleanup(proc.stderr.close)
1027 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001028
1029 count, max = 0, 100
1030 while count < max and proc.poll() is None:
1031 # Create a string buffer to store the result of stdout from the pipe
1032 buf = ctypes.create_string_buffer(len(msg))
1033 # Obtain the text currently in proc.stdout
1034 # Bytes read/avail/left are left as NULL and unused
1035 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1036 buf, ctypes.sizeof(buf), None, None, None)
1037 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1038 if buf.value:
1039 self.assertEqual(msg, buf.value.decode())
1040 break
1041 time.sleep(0.1)
1042 count += 1
1043 else:
1044 self.fail("Did not receive communication from the subprocess")
1045
Brian Curtineb24d742010-04-12 17:16:38 +00001046 os.kill(proc.pid, sig)
1047 self.assertEqual(proc.wait(), sig)
1048
1049 def test_kill_sigterm(self):
1050 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001051 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001052
1053 def test_kill_int(self):
1054 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001055 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001056
1057 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001058 tagname = "test_os_%s" % uuid.uuid1()
1059 m = mmap.mmap(-1, 1, tagname)
1060 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001061 # Run a script which has console control handling enabled.
1062 proc = subprocess.Popen([sys.executable,
1063 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001064 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001065 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1066 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001067 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001068 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001069 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001070 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001071 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001072 count += 1
1073 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001074 # Forcefully kill the process if we weren't able to signal it.
1075 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001076 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001077 os.kill(proc.pid, event)
1078 # proc.send_signal(event) could also be done here.
1079 # Allow time for the signal to be passed and the process to exit.
1080 time.sleep(0.5)
1081 if not proc.poll():
1082 # Forcefully kill the process if we weren't able to signal it.
1083 os.kill(proc.pid, signal.SIGINT)
1084 self.fail("subprocess did not stop on {}".format(name))
1085
1086 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1087 def test_CTRL_C_EVENT(self):
1088 from ctypes import wintypes
1089 import ctypes
1090
1091 # Make a NULL value by creating a pointer with no argument.
1092 NULL = ctypes.POINTER(ctypes.c_int)()
1093 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1094 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1095 wintypes.BOOL)
1096 SetConsoleCtrlHandler.restype = wintypes.BOOL
1097
1098 # Calling this with NULL and FALSE causes the calling process to
1099 # handle CTRL+C, rather than ignore it. This property is inherited
1100 # by subprocesses.
1101 SetConsoleCtrlHandler(NULL, 0)
1102
1103 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1104
1105 def test_CTRL_BREAK_EVENT(self):
1106 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1107
1108
Brian Curtind40e6f72010-07-08 21:39:08 +00001109@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001110@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001111class Win32SymlinkTests(unittest.TestCase):
1112 filelink = 'filelinktest'
1113 filelink_target = os.path.abspath(__file__)
1114 dirlink = 'dirlinktest'
1115 dirlink_target = os.path.dirname(filelink_target)
1116 missing_link = 'missing link'
1117
1118 def setUp(self):
1119 assert os.path.exists(self.dirlink_target)
1120 assert os.path.exists(self.filelink_target)
1121 assert not os.path.exists(self.dirlink)
1122 assert not os.path.exists(self.filelink)
1123 assert not os.path.exists(self.missing_link)
1124
1125 def tearDown(self):
1126 if os.path.exists(self.filelink):
1127 os.remove(self.filelink)
1128 if os.path.exists(self.dirlink):
1129 os.rmdir(self.dirlink)
1130 if os.path.lexists(self.missing_link):
1131 os.remove(self.missing_link)
1132
1133 def test_directory_link(self):
1134 os.symlink(self.dirlink_target, self.dirlink)
1135 self.assertTrue(os.path.exists(self.dirlink))
1136 self.assertTrue(os.path.isdir(self.dirlink))
1137 self.assertTrue(os.path.islink(self.dirlink))
1138 self.check_stat(self.dirlink, self.dirlink_target)
1139
1140 def test_file_link(self):
1141 os.symlink(self.filelink_target, self.filelink)
1142 self.assertTrue(os.path.exists(self.filelink))
1143 self.assertTrue(os.path.isfile(self.filelink))
1144 self.assertTrue(os.path.islink(self.filelink))
1145 self.check_stat(self.filelink, self.filelink_target)
1146
1147 def _create_missing_dir_link(self):
1148 'Create a "directory" link to a non-existent target'
1149 linkname = self.missing_link
1150 if os.path.lexists(linkname):
1151 os.remove(linkname)
1152 target = r'c:\\target does not exist.29r3c740'
1153 assert not os.path.exists(target)
1154 target_is_dir = True
1155 os.symlink(target, linkname, target_is_dir)
1156
1157 def test_remove_directory_link_to_missing_target(self):
1158 self._create_missing_dir_link()
1159 # For compatibility with Unix, os.remove will check the
1160 # directory status and call RemoveDirectory if the symlink
1161 # was created with target_is_dir==True.
1162 os.remove(self.missing_link)
1163
1164 @unittest.skip("currently fails; consider for improvement")
1165 def test_isdir_on_directory_link_to_missing_target(self):
1166 self._create_missing_dir_link()
1167 # consider having isdir return true for directory links
1168 self.assertTrue(os.path.isdir(self.missing_link))
1169
1170 @unittest.skip("currently fails; consider for improvement")
1171 def test_rmdir_on_directory_link_to_missing_target(self):
1172 self._create_missing_dir_link()
1173 # consider allowing rmdir to remove directory links
1174 os.rmdir(self.missing_link)
1175
1176 def check_stat(self, link, target):
1177 self.assertEqual(os.stat(link), os.stat(target))
1178 self.assertNotEqual(os.lstat(link), os.stat(link))
1179
Brian Curtind25aef52011-06-13 15:16:04 -05001180 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001181 with warnings.catch_warnings():
1182 warnings.simplefilter("ignore", DeprecationWarning)
1183 self.assertEqual(os.stat(bytes_link), os.stat(target))
1184 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001185
1186 def test_12084(self):
1187 level1 = os.path.abspath(support.TESTFN)
1188 level2 = os.path.join(level1, "level2")
1189 level3 = os.path.join(level2, "level3")
1190 try:
1191 os.mkdir(level1)
1192 os.mkdir(level2)
1193 os.mkdir(level3)
1194
1195 file1 = os.path.abspath(os.path.join(level1, "file1"))
1196
1197 with open(file1, "w") as f:
1198 f.write("file1")
1199
1200 orig_dir = os.getcwd()
1201 try:
1202 os.chdir(level2)
1203 link = os.path.join(level2, "link")
1204 os.symlink(os.path.relpath(file1), "link")
1205 self.assertIn("link", os.listdir(os.getcwd()))
1206
1207 # Check os.stat calls from the same dir as the link
1208 self.assertEqual(os.stat(file1), os.stat("link"))
1209
1210 # Check os.stat calls from a dir below the link
1211 os.chdir(level1)
1212 self.assertEqual(os.stat(file1),
1213 os.stat(os.path.relpath(link)))
1214
1215 # Check os.stat calls from a dir above the link
1216 os.chdir(level3)
1217 self.assertEqual(os.stat(file1),
1218 os.stat(os.path.relpath(link)))
1219 finally:
1220 os.chdir(orig_dir)
1221 except OSError as err:
1222 self.fail(err)
1223 finally:
1224 os.remove(file1)
1225 shutil.rmtree(level1)
1226
Brian Curtind40e6f72010-07-08 21:39:08 +00001227
Victor Stinnere8d51452010-08-19 01:05:19 +00001228class FSEncodingTests(unittest.TestCase):
1229 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001230 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1231 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001232
Victor Stinnere8d51452010-08-19 01:05:19 +00001233 def test_identity(self):
1234 # assert fsdecode(fsencode(x)) == x
1235 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1236 try:
1237 bytesfn = os.fsencode(fn)
1238 except UnicodeEncodeError:
1239 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001240 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001241
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001242
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001243class PidTests(unittest.TestCase):
1244 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1245 def test_getppid(self):
1246 p = subprocess.Popen([sys.executable, '-c',
1247 'import os; print(os.getppid())'],
1248 stdout=subprocess.PIPE)
1249 stdout, _ = p.communicate()
1250 # We are the parent of our subprocess
1251 self.assertEqual(int(stdout), os.getpid())
1252
1253
Brian Curtin0151b8e2010-09-24 13:43:43 +00001254# The introduction of this TestCase caused at least two different errors on
1255# *nix buildbots. Temporarily skip this to let the buildbots move along.
1256@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001257@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1258class LoginTests(unittest.TestCase):
1259 def test_getlogin(self):
1260 user_name = os.getlogin()
1261 self.assertNotEqual(len(user_name), 0)
1262
1263
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001264@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1265 "needs os.getpriority and os.setpriority")
1266class ProgramPriorityTests(unittest.TestCase):
1267 """Tests for os.getpriority() and os.setpriority()."""
1268
1269 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001270
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001271 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1272 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1273 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001274 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1275 if base >= 19 and new_prio <= 19:
1276 raise unittest.SkipTest(
1277 "unable to reliably test setpriority at current nice level of %s" % base)
1278 else:
1279 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001280 finally:
1281 try:
1282 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1283 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001284 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001285 raise
1286
1287
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001288if threading is not None:
1289 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001290
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001291 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001292
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001293 def __init__(self, conn):
1294 asynchat.async_chat.__init__(self, conn)
1295 self.in_buffer = []
1296 self.closed = False
1297 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001298
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001299 def handle_read(self):
1300 data = self.recv(4096)
1301 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001302
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001303 def get_data(self):
1304 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001305
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001306 def handle_close(self):
1307 self.close()
1308 self.closed = True
1309
1310 def handle_error(self):
1311 raise
1312
1313 def __init__(self, address):
1314 threading.Thread.__init__(self)
1315 asyncore.dispatcher.__init__(self)
1316 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1317 self.bind(address)
1318 self.listen(5)
1319 self.host, self.port = self.socket.getsockname()[:2]
1320 self.handler_instance = None
1321 self._active = False
1322 self._active_lock = threading.Lock()
1323
1324 # --- public API
1325
1326 @property
1327 def running(self):
1328 return self._active
1329
1330 def start(self):
1331 assert not self.running
1332 self.__flag = threading.Event()
1333 threading.Thread.start(self)
1334 self.__flag.wait()
1335
1336 def stop(self):
1337 assert self.running
1338 self._active = False
1339 self.join()
1340
1341 def wait(self):
1342 # wait for handler connection to be closed, then stop the server
1343 while not getattr(self.handler_instance, "closed", False):
1344 time.sleep(0.001)
1345 self.stop()
1346
1347 # --- internals
1348
1349 def run(self):
1350 self._active = True
1351 self.__flag.set()
1352 while self._active and asyncore.socket_map:
1353 self._active_lock.acquire()
1354 asyncore.loop(timeout=0.001, count=1)
1355 self._active_lock.release()
1356 asyncore.close_all()
1357
1358 def handle_accept(self):
1359 conn, addr = self.accept()
1360 self.handler_instance = self.Handler(conn)
1361
1362 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001363 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001364 handle_read = handle_connect
1365
1366 def writable(self):
1367 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001368
1369 def handle_error(self):
1370 raise
1371
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001372
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001373@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001374@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1375class TestSendfile(unittest.TestCase):
1376
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001377 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001378 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001379 not sys.platform.startswith("solaris") and \
1380 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001381
1382 @classmethod
1383 def setUpClass(cls):
1384 with open(support.TESTFN, "wb") as f:
1385 f.write(cls.DATA)
1386
1387 @classmethod
1388 def tearDownClass(cls):
1389 support.unlink(support.TESTFN)
1390
1391 def setUp(self):
1392 self.server = SendfileTestServer((support.HOST, 0))
1393 self.server.start()
1394 self.client = socket.socket()
1395 self.client.connect((self.server.host, self.server.port))
1396 self.client.settimeout(1)
1397 # synchronize by waiting for "220 ready" response
1398 self.client.recv(1024)
1399 self.sockno = self.client.fileno()
1400 self.file = open(support.TESTFN, 'rb')
1401 self.fileno = self.file.fileno()
1402
1403 def tearDown(self):
1404 self.file.close()
1405 self.client.close()
1406 if self.server.running:
1407 self.server.stop()
1408
1409 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1410 """A higher level wrapper representing how an application is
1411 supposed to use sendfile().
1412 """
1413 while 1:
1414 try:
1415 if self.SUPPORT_HEADERS_TRAILERS:
1416 return os.sendfile(sock, file, offset, nbytes, headers,
1417 trailers)
1418 else:
1419 return os.sendfile(sock, file, offset, nbytes)
1420 except OSError as err:
1421 if err.errno == errno.ECONNRESET:
1422 # disconnected
1423 raise
1424 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1425 # we have to retry send data
1426 continue
1427 else:
1428 raise
1429
1430 def test_send_whole_file(self):
1431 # normal send
1432 total_sent = 0
1433 offset = 0
1434 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001435 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001436 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1437 if sent == 0:
1438 break
1439 offset += sent
1440 total_sent += sent
1441 self.assertTrue(sent <= nbytes)
1442 self.assertEqual(offset, total_sent)
1443
1444 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001445 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001446 self.client.close()
1447 self.server.wait()
1448 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001449 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001450 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001451
1452 def test_send_at_certain_offset(self):
1453 # start sending a file at a certain offset
1454 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001455 offset = len(self.DATA) // 2
1456 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001457 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001458 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001459 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1460 if sent == 0:
1461 break
1462 offset += sent
1463 total_sent += sent
1464 self.assertTrue(sent <= nbytes)
1465
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001466 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001467 self.client.close()
1468 self.server.wait()
1469 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001470 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001471 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001472 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001473 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001474
1475 def test_offset_overflow(self):
1476 # specify an offset > file size
1477 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001478 try:
1479 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1480 except OSError as e:
1481 # Solaris can raise EINVAL if offset >= file length, ignore.
1482 if e.errno != errno.EINVAL:
1483 raise
1484 else:
1485 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001486 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001487 self.client.close()
1488 self.server.wait()
1489 data = self.server.handler_instance.get_data()
1490 self.assertEqual(data, b'')
1491
1492 def test_invalid_offset(self):
1493 with self.assertRaises(OSError) as cm:
1494 os.sendfile(self.sockno, self.fileno, -1, 4096)
1495 self.assertEqual(cm.exception.errno, errno.EINVAL)
1496
1497 # --- headers / trailers tests
1498
1499 if SUPPORT_HEADERS_TRAILERS:
1500
1501 def test_headers(self):
1502 total_sent = 0
1503 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1504 headers=[b"x" * 512])
1505 total_sent += sent
1506 offset = 4096
1507 nbytes = 4096
1508 while 1:
1509 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1510 offset, nbytes)
1511 if sent == 0:
1512 break
1513 total_sent += sent
1514 offset += sent
1515
1516 expected_data = b"x" * 512 + self.DATA
1517 self.assertEqual(total_sent, len(expected_data))
1518 self.client.close()
1519 self.server.wait()
1520 data = self.server.handler_instance.get_data()
1521 self.assertEqual(hash(data), hash(expected_data))
1522
1523 def test_trailers(self):
1524 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001525 with open(TESTFN2, 'wb') as f:
1526 f.write(b"abcde")
1527 with open(TESTFN2, 'rb')as f:
1528 self.addCleanup(os.remove, TESTFN2)
1529 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1530 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001531 self.client.close()
1532 self.server.wait()
1533 data = self.server.handler_instance.get_data()
1534 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001535
1536 if hasattr(os, "SF_NODISKIO"):
1537 def test_flags(self):
1538 try:
1539 os.sendfile(self.sockno, self.fileno, 0, 4096,
1540 flags=os.SF_NODISKIO)
1541 except OSError as err:
1542 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1543 raise
1544
1545
Benjamin Peterson799bd802011-08-31 22:15:17 -04001546def supports_extended_attributes():
1547 if not hasattr(os, "setxattr"):
1548 return False
1549 try:
1550 with open(support.TESTFN, "wb") as fp:
1551 try:
1552 os.fsetxattr(fp.fileno(), b"user.test", b"")
1553 except OSError as e:
1554 if e.errno != errno.ENOTSUP:
1555 raise
1556 return False
1557 finally:
1558 support.unlink(support.TESTFN)
1559 # Kernels < 2.6.39 don't respect setxattr flags.
1560 kernel_version = platform.release()
1561 m = re.match("2.6.(\d{1,2})", kernel_version)
1562 return m is None or int(m.group(1)) >= 39
1563
1564
1565@unittest.skipUnless(supports_extended_attributes(),
1566 "no non-broken extended attribute support")
1567class ExtendedAttributeTests(unittest.TestCase):
1568
1569 def tearDown(self):
1570 support.unlink(support.TESTFN)
1571
1572 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1573 fn = support.TESTFN
1574 open(fn, "wb").close()
1575 with self.assertRaises(OSError) as cm:
1576 getxattr(fn, s("user.test"))
1577 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001578 init_xattr = listxattr(fn)
1579 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001580 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001581 xattr = set(init_xattr)
1582 xattr.add("user.test")
1583 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001584 self.assertEqual(getxattr(fn, b"user.test"), b"")
1585 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1586 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1587 with self.assertRaises(OSError) as cm:
1588 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1589 self.assertEqual(cm.exception.errno, errno.EEXIST)
1590 with self.assertRaises(OSError) as cm:
1591 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1592 self.assertEqual(cm.exception.errno, errno.ENODATA)
1593 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001594 xattr.add("user.test2")
1595 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001596 removexattr(fn, s("user.test"))
1597 with self.assertRaises(OSError) as cm:
1598 getxattr(fn, s("user.test"))
1599 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001600 xattr.remove("user.test")
1601 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001602 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1603 setxattr(fn, s("user.test"), b"a"*1024)
1604 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1605 removexattr(fn, s("user.test"))
1606 many = sorted("user.test{}".format(i) for i in range(100))
1607 for thing in many:
1608 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001609 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001610
1611 def _check_xattrs(self, *args):
1612 def make_bytes(s):
1613 return bytes(s, "ascii")
1614 self._check_xattrs_str(str, *args)
1615 support.unlink(support.TESTFN)
1616 self._check_xattrs_str(make_bytes, *args)
1617
1618 def test_simple(self):
1619 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1620 os.listxattr)
1621
1622 def test_lpath(self):
1623 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1624 os.llistxattr)
1625
1626 def test_fds(self):
1627 def getxattr(path, *args):
1628 with open(path, "rb") as fp:
1629 return os.fgetxattr(fp.fileno(), *args)
1630 def setxattr(path, *args):
1631 with open(path, "wb") as fp:
1632 os.fsetxattr(fp.fileno(), *args)
1633 def removexattr(path, *args):
1634 with open(path, "wb") as fp:
1635 os.fremovexattr(fp.fileno(), *args)
1636 def listxattr(path, *args):
1637 with open(path, "rb") as fp:
1638 return os.flistxattr(fp.fileno(), *args)
1639 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1640
1641
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001642@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1643class Win32DeprecatedBytesAPI(unittest.TestCase):
1644 def test_deprecated(self):
1645 import nt
1646 filename = os.fsencode(support.TESTFN)
1647 with warnings.catch_warnings():
1648 warnings.simplefilter("error", DeprecationWarning)
1649 for func, *args in (
1650 (nt._getfullpathname, filename),
1651 (nt._isdir, filename),
1652 (os.access, filename, os.R_OK),
1653 (os.chdir, filename),
1654 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001655 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001656 (os.link, filename, filename),
1657 (os.listdir, filename),
1658 (os.lstat, filename),
1659 (os.mkdir, filename),
1660 (os.open, filename, os.O_RDONLY),
1661 (os.rename, filename, filename),
1662 (os.rmdir, filename),
1663 (os.startfile, filename),
1664 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001665 (os.unlink, filename),
1666 (os.utime, filename),
1667 ):
1668 self.assertRaises(DeprecationWarning, func, *args)
1669
Victor Stinner28216442011-11-16 00:34:44 +01001670 @support.skip_unless_symlink
1671 def test_symlink(self):
1672 filename = os.fsencode(support.TESTFN)
1673 with warnings.catch_warnings():
1674 warnings.simplefilter("error", DeprecationWarning)
1675 self.assertRaises(DeprecationWarning,
1676 os.symlink, filename, filename)
1677
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001678
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001679@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001680def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001681 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001682 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001683 StatAttributeTests,
1684 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001685 WalkTests,
1686 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001687 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001688 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001689 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001690 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001691 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001692 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001693 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001694 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001695 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001696 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001697 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001698 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001699 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001700 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001701 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001702 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001703 Win32DeprecatedBytesAPI,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001704 )
Fred Drake2e2be372001-09-20 21:33:42 +00001705
1706if __name__ == "__main__":
1707 test_main()