blob: ef5f8052aae1f4ab81c75c32ea389bcd84ee3681 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
23try:
24 import threading
25except ImportError:
26 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000027
Mark Dickinson7cf03892010-04-16 13:45:35 +000028# Detect whether we're on a Linux system that uses the (now outdated
29# and unmaintained) linuxthreads threading library. There's an issue
30# when combining linuxthreads with a failed execv call: see
31# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020032if hasattr(sys, 'thread_info') and sys.thread_info.version:
33 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
34else:
35 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000036
Thomas Wouters0e3f5912006-08-11 14:57:12 +000037# Tests creating TESTFN
38class FileTests(unittest.TestCase):
39 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040 if os.path.exists(support.TESTFN):
41 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000042 tearDown = setUp
43
44 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000045 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000047 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048
Christian Heimesfdab48e2008-01-20 09:06:41 +000049 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000050 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
51 # We must allocate two consecutive file descriptors, otherwise
52 # it will mess up other file descriptors (perhaps even the three
53 # standard ones).
54 second = os.dup(first)
55 try:
56 retries = 0
57 while second != first + 1:
58 os.close(first)
59 retries += 1
60 if retries > 10:
61 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000062 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first, second = second, os.dup(second)
64 finally:
65 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000066 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000067 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000068 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000070 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000071 def test_rename(self):
72 path = support.TESTFN
73 old = sys.getrefcount(path)
74 self.assertRaises(TypeError, os.rename, path, 0)
75 new = sys.getrefcount(path)
76 self.assertEqual(old, new)
77
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000078 def test_read(self):
79 with open(support.TESTFN, "w+b") as fobj:
80 fobj.write(b"spam")
81 fobj.flush()
82 fd = fobj.fileno()
83 os.lseek(fd, 0, 0)
84 s = os.read(fd, 4)
85 self.assertEqual(type(s), bytes)
86 self.assertEqual(s, b"spam")
87
88 def test_write(self):
89 # os.write() accepts bytes- and buffer-like objects but not strings
90 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
91 self.assertRaises(TypeError, os.write, fd, "beans")
92 os.write(fd, b"bacon\n")
93 os.write(fd, bytearray(b"eggs\n"))
94 os.write(fd, memoryview(b"spam\n"))
95 os.close(fd)
96 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000097 self.assertEqual(fobj.read().splitlines(),
98 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099
Victor Stinnere0daff12011-03-20 23:36:35 +0100100 def write_windows_console(self, *args):
101 retcode = subprocess.call(args,
102 # use a new console to not flood the test output
103 creationflags=subprocess.CREATE_NEW_CONSOLE,
104 # use a shell to hide the console window (SW_HIDE)
105 shell=True)
106 self.assertEqual(retcode, 0)
107
108 @unittest.skipUnless(sys.platform == 'win32',
109 'test specific to the Windows console')
110 def test_write_windows_console(self):
111 # Issue #11395: the Windows console returns an error (12: not enough
112 # space error) on writing into stdout if stdout mode is binary and the
113 # length is greater than 66,000 bytes (or less, depending on heap
114 # usage).
115 code = "print('x' * 100000)"
116 self.write_windows_console(sys.executable, "-c", code)
117 self.write_windows_console(sys.executable, "-u", "-c", code)
118
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000119 def fdopen_helper(self, *args):
120 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200121 f = os.fdopen(fd, *args)
122 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000123
124 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200125 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
126 os.close(fd)
127
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000128 self.fdopen_helper()
129 self.fdopen_helper('r')
130 self.fdopen_helper('r', 100)
131
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200132
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000133# Test attributes on return values from os.*stat* family.
134class StatAttributeTests(unittest.TestCase):
135 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000136 os.mkdir(support.TESTFN)
137 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000138 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000139 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000140 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000141
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000142 def tearDown(self):
143 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000144 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000145
Antoine Pitrou38425292010-09-21 18:19:07 +0000146 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000147 if not hasattr(os, "stat"):
148 return
149
150 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000151 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000152
153 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000154 self.assertEqual(result[stat.ST_SIZE], 3)
155 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000156
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000157 # Make sure all the attributes are there
158 members = dir(result)
159 for name in dir(stat):
160 if name[:3] == 'ST_':
161 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000162 if name.endswith("TIME"):
163 def trunc(x): return int(x)
164 else:
165 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000166 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000168 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169
170 try:
171 result[200]
172 self.fail("No exception thrown")
173 except IndexError:
174 pass
175
176 # Make sure that assignment fails
177 try:
178 result.st_mode = 1
179 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000180 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 pass
182
183 try:
184 result.st_rdev = 1
185 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000186 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000187 pass
188
189 try:
190 result.parrot = 1
191 self.fail("No exception thrown")
192 except AttributeError:
193 pass
194
195 # Use the stat_result constructor with a too-short tuple.
196 try:
197 result2 = os.stat_result((10,))
198 self.fail("No exception thrown")
199 except TypeError:
200 pass
201
Ezio Melotti42da6632011-03-15 05:18:48 +0200202 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203 try:
204 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
205 except TypeError:
206 pass
207
Antoine Pitrou38425292010-09-21 18:19:07 +0000208 def test_stat_attributes(self):
209 self.check_stat_attributes(self.fname)
210
211 def test_stat_attributes_bytes(self):
212 try:
213 fname = self.fname.encode(sys.getfilesystemencoding())
214 except UnicodeEncodeError:
215 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100216 with warnings.catch_warnings():
217 warnings.simplefilter("ignore", DeprecationWarning)
218 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000219
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 def test_statvfs_attributes(self):
221 if not hasattr(os, "statvfs"):
222 return
223
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000224 try:
225 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000226 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000227 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000228 if e.errno == errno.ENOSYS:
229 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000230
231 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000232 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000234 # Make sure all the attributes are there.
235 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
236 'ffree', 'favail', 'flag', 'namemax')
237 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000238 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000239
240 # Make sure that assignment really fails
241 try:
242 result.f_bfree = 1
243 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000244 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000245 pass
246
247 try:
248 result.parrot = 1
249 self.fail("No exception thrown")
250 except AttributeError:
251 pass
252
253 # Use the constructor with a too-short tuple.
254 try:
255 result2 = os.statvfs_result((10,))
256 self.fail("No exception thrown")
257 except TypeError:
258 pass
259
Ezio Melotti42da6632011-03-15 05:18:48 +0200260 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261 try:
262 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
263 except TypeError:
264 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000265
Thomas Wouters89f507f2006-12-13 04:49:30 +0000266 def test_utime_dir(self):
267 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000268 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000269 # round to int, because some systems may support sub-second
270 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000271 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
272 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000273 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000274
Brian Curtin52fbea12011-11-06 13:41:17 -0600275 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600276 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600277 # second argument. Check that the previous methods of passing
278 # a time tuple or None work in addition to no argument.
279 st = os.stat(support.TESTFN)
280 # Doesn't set anything new, but sets the time tuple way
281 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
282 # Set to the current time in the old explicit way.
283 os.utime(support.TESTFN, None)
284 st1 = os.stat(support.TESTFN)
285 # Set to the current time in the new way
286 os.utime(support.TESTFN)
287 st2 = os.stat(support.TESTFN)
288 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
289
Thomas Wouters89f507f2006-12-13 04:49:30 +0000290 # Restrict test to Win32, since there is no guarantee other
291 # systems support centiseconds
292 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000293 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000294 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000295 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000296 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000297 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000298 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000299 return buf.value
300
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000301 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000302 def test_1565150(self):
303 t1 = 1159195039.25
304 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000305 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000306
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000307 def test_large_time(self):
308 t1 = 5000000000 # some day in 2128
309 os.utime(self.fname, (t1, t1))
310 self.assertEqual(os.stat(self.fname).st_mtime, t1)
311
Guido van Rossumd8faa362007-04-27 19:54:29 +0000312 def test_1686475(self):
313 # Verify that an open file can be stat'ed
314 try:
315 os.stat(r"c:\pagefile.sys")
316 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000317 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000318 return
319 self.fail("Could not stat pagefile.sys")
320
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000321from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000322
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000323class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000324 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000325 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000326
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000327 def setUp(self):
328 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000329 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000330 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000331 for key, value in self._reference().items():
332 os.environ[key] = value
333
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000334 def tearDown(self):
335 os.environ.clear()
336 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000337 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000338 os.environb.clear()
339 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000340
Christian Heimes90333392007-11-01 19:08:42 +0000341 def _reference(self):
342 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
343
344 def _empty_mapping(self):
345 os.environ.clear()
346 return os.environ
347
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000348 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000349 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000350 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000351 if os.path.exists("/bin/sh"):
352 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000353 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
354 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000355 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000356
Christian Heimes1a13d592007-11-08 14:16:55 +0000357 def test_os_popen_iter(self):
358 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000359 with os.popen(
360 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
361 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000362 self.assertEqual(next(it), "line1\n")
363 self.assertEqual(next(it), "line2\n")
364 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000365 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000366
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000367 # Verify environ keys and values from the OS are of the
368 # correct str type.
369 def test_keyvalue_types(self):
370 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000371 self.assertEqual(type(key), str)
372 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000373
Christian Heimes90333392007-11-01 19:08:42 +0000374 def test_items(self):
375 for key, value in self._reference().items():
376 self.assertEqual(os.environ.get(key), value)
377
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000378 # Issue 7310
379 def test___repr__(self):
380 """Check that the repr() of os.environ looks like environ({...})."""
381 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000382 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
383 '{!r}: {!r}'.format(key, value)
384 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000385
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000386 def test_get_exec_path(self):
387 defpath_list = os.defpath.split(os.pathsep)
388 test_path = ['/monty', '/python', '', '/flying/circus']
389 test_env = {'PATH': os.pathsep.join(test_path)}
390
391 saved_environ = os.environ
392 try:
393 os.environ = dict(test_env)
394 # Test that defaulting to os.environ works.
395 self.assertSequenceEqual(test_path, os.get_exec_path())
396 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
397 finally:
398 os.environ = saved_environ
399
400 # No PATH environment variable
401 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
402 # Empty PATH environment variable
403 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
404 # Supplied PATH environment variable
405 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
406
Victor Stinnerb745a742010-05-18 17:17:23 +0000407 if os.supports_bytes_environ:
408 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000409 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000410 # ignore BytesWarning warning
411 with warnings.catch_warnings(record=True):
412 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000413 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000414 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000415 pass
416 else:
417 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000418
419 # bytes key and/or value
420 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
421 ['abc'])
422 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
423 ['abc'])
424 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
425 ['abc'])
426
427 @unittest.skipUnless(os.supports_bytes_environ,
428 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000429 def test_environb(self):
430 # os.environ -> os.environb
431 value = 'euro\u20ac'
432 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000433 value_bytes = value.encode(sys.getfilesystemencoding(),
434 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000435 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000436 msg = "U+20AC character is not encodable to %s" % (
437 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000438 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000439 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000440 self.assertEqual(os.environ['unicode'], value)
441 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000442
443 # os.environb -> os.environ
444 value = b'\xff'
445 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000446 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000447 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000448 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000449
Victor Stinner60b385e2011-11-22 22:01:28 +0100450 def test_unset_error(self):
451 if sys.platform == "win32":
452 # an environment variable is limited to 32,767 characters
453 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100454 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100455 else:
456 # "=" is not allowed in a variable name
457 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100458 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100459
Tim Petersc4e09402003-04-25 07:11:48 +0000460class WalkTests(unittest.TestCase):
461 """Tests for os.walk()."""
462
463 def test_traversal(self):
464 import os
465 from os.path import join
466
467 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000468 # TESTFN/
469 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000470 # tmp1
471 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000472 # tmp2
473 # SUB11/ no kids
474 # SUB2/ a file kid and a dirsymlink kid
475 # tmp3
476 # link/ a symlink to TESTFN.2
477 # TEST2/
478 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000479 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000480 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000481 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000482 sub2_path = join(walk_path, "SUB2")
483 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000484 tmp2_path = join(sub1_path, "tmp2")
485 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000486 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000487 t2_path = join(support.TESTFN, "TEST2")
488 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000489
490 # Create stuff.
491 os.makedirs(sub11_path)
492 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000493 os.makedirs(t2_path)
494 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000495 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000496 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
497 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000498 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000499 os.symlink(os.path.abspath(t2_path), link_path)
500 sub2_tree = (sub2_path, ["link"], ["tmp3"])
501 else:
502 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000503
504 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000505 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000506 self.assertEqual(len(all), 4)
507 # We can't know which order SUB1 and SUB2 will appear in.
508 # Not flipped: TESTFN, SUB1, SUB11, SUB2
509 # flipped: TESTFN, SUB2, SUB1, SUB11
510 flipped = all[0][1][0] != "SUB1"
511 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000512 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000513 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
514 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000515 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000516
517 # Prune the search.
518 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000519 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000520 all.append((root, dirs, files))
521 # Don't descend into SUB1.
522 if 'SUB1' in dirs:
523 # Note that this also mutates the dirs we appended to all!
524 dirs.remove('SUB1')
525 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000526 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
527 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000528
529 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000530 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000531 self.assertEqual(len(all), 4)
532 # We can't know which order SUB1 and SUB2 will appear in.
533 # Not flipped: SUB11, SUB1, SUB2, TESTFN
534 # flipped: SUB2, SUB11, SUB1, TESTFN
535 flipped = all[3][1][0] != "SUB1"
536 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000537 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000538 self.assertEqual(all[flipped], (sub11_path, [], []))
539 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000540 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000541
Brian Curtin3b4499c2010-12-28 14:31:47 +0000542 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000543 # Walk, following symlinks.
544 for root, dirs, files in os.walk(walk_path, followlinks=True):
545 if root == link_path:
546 self.assertEqual(dirs, [])
547 self.assertEqual(files, ["tmp4"])
548 break
549 else:
550 self.fail("Didn't follow symlink with followlinks=True")
551
552 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000553 # Tear everything down. This is a decent use for bottom-up on
554 # Windows, which doesn't have a recursive delete command. The
555 # (not so) subtlety is that rmdir will fail unless the dir's
556 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000557 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000558 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000559 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000560 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000561 dirname = os.path.join(root, name)
562 if not os.path.islink(dirname):
563 os.rmdir(dirname)
564 else:
565 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000566 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000567
Guido van Rossume7ba4952007-06-06 23:52:48 +0000568class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000569 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000570 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000571
572 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000573 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000574 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
575 os.makedirs(path) # Should work
576 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
577 os.makedirs(path)
578
579 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000580 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000581 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
582 os.makedirs(path)
583 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
584 'dir5', 'dir6')
585 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000586
Terry Reedy5a22b652010-12-02 07:05:56 +0000587 def test_exist_ok_existing_directory(self):
588 path = os.path.join(support.TESTFN, 'dir1')
589 mode = 0o777
590 old_mask = os.umask(0o022)
591 os.makedirs(path, mode)
592 self.assertRaises(OSError, os.makedirs, path, mode)
593 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
594 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
595 os.makedirs(path, mode=mode, exist_ok=True)
596 os.umask(old_mask)
597
598 def test_exist_ok_existing_regular_file(self):
599 base = support.TESTFN
600 path = os.path.join(support.TESTFN, 'dir1')
601 f = open(path, 'w')
602 f.write('abc')
603 f.close()
604 self.assertRaises(OSError, os.makedirs, path)
605 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
606 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
607 os.remove(path)
608
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000609 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000610 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000611 'dir4', 'dir5', 'dir6')
612 # If the tests failed, the bottom-most directory ('../dir6')
613 # may not have been created, so we look for the outermost directory
614 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000615 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000616 path = os.path.dirname(path)
617
618 os.removedirs(path)
619
Guido van Rossume7ba4952007-06-06 23:52:48 +0000620class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000621 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200622 with open(os.devnull, 'wb') as f:
623 f.write(b'hello')
624 f.close()
625 with open(os.devnull, 'rb') as f:
626 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000627
Guido van Rossume7ba4952007-06-06 23:52:48 +0000628class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000629 def test_urandom(self):
630 try:
631 self.assertEqual(len(os.urandom(1)), 1)
632 self.assertEqual(len(os.urandom(10)), 10)
633 self.assertEqual(len(os.urandom(100)), 100)
634 self.assertEqual(len(os.urandom(1000)), 1000)
635 except NotImplementedError:
636 pass
637
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000638@contextlib.contextmanager
639def _execvpe_mockup(defpath=None):
640 """
641 Stubs out execv and execve functions when used as context manager.
642 Records exec calls. The mock execv and execve functions always raise an
643 exception as they would normally never return.
644 """
645 # A list of tuples containing (function name, first arg, args)
646 # of calls to execv or execve that have been made.
647 calls = []
648
649 def mock_execv(name, *args):
650 calls.append(('execv', name, args))
651 raise RuntimeError("execv called")
652
653 def mock_execve(name, *args):
654 calls.append(('execve', name, args))
655 raise OSError(errno.ENOTDIR, "execve called")
656
657 try:
658 orig_execv = os.execv
659 orig_execve = os.execve
660 orig_defpath = os.defpath
661 os.execv = mock_execv
662 os.execve = mock_execve
663 if defpath is not None:
664 os.defpath = defpath
665 yield calls
666 finally:
667 os.execv = orig_execv
668 os.execve = orig_execve
669 os.defpath = orig_defpath
670
Guido van Rossume7ba4952007-06-06 23:52:48 +0000671class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000672 @unittest.skipIf(USING_LINUXTHREADS,
673 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000674 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000675 self.assertRaises(OSError, os.execvpe, 'no such app-',
676 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000677
Thomas Heller6790d602007-08-30 17:15:14 +0000678 def test_execvpe_with_bad_arglist(self):
679 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
680
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000681 @unittest.skipUnless(hasattr(os, '_execvpe'),
682 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000683 def _test_internal_execvpe(self, test_type):
684 program_path = os.sep + 'absolutepath'
685 if test_type is bytes:
686 program = b'executable'
687 fullpath = os.path.join(os.fsencode(program_path), program)
688 native_fullpath = fullpath
689 arguments = [b'progname', 'arg1', 'arg2']
690 else:
691 program = 'executable'
692 arguments = ['progname', 'arg1', 'arg2']
693 fullpath = os.path.join(program_path, program)
694 if os.name != "nt":
695 native_fullpath = os.fsencode(fullpath)
696 else:
697 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000698 env = {'spam': 'beans'}
699
Victor Stinnerb745a742010-05-18 17:17:23 +0000700 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000701 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000702 self.assertRaises(RuntimeError,
703 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000704 self.assertEqual(len(calls), 1)
705 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
706
Victor Stinnerb745a742010-05-18 17:17:23 +0000707 # test os._execvpe() with a relative path:
708 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000709 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000710 self.assertRaises(OSError,
711 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000712 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000713 self.assertSequenceEqual(calls[0],
714 ('execve', native_fullpath, (arguments, env)))
715
716 # test os._execvpe() with a relative path:
717 # os.get_exec_path() reads the 'PATH' variable
718 with _execvpe_mockup() as calls:
719 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000720 if test_type is bytes:
721 env_path[b'PATH'] = program_path
722 else:
723 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000724 self.assertRaises(OSError,
725 os._execvpe, program, arguments, env=env_path)
726 self.assertEqual(len(calls), 1)
727 self.assertSequenceEqual(calls[0],
728 ('execve', native_fullpath, (arguments, env_path)))
729
730 def test_internal_execvpe_str(self):
731 self._test_internal_execvpe(str)
732 if os.name != "nt":
733 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000734
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000735
Thomas Wouters477c8d52006-05-27 19:21:47 +0000736class Win32ErrorTests(unittest.TestCase):
737 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000738 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000739
740 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000741 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000742
743 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000744 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000745
746 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000747 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000748 try:
749 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
750 finally:
751 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000752 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000753
754 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000755 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000756
Thomas Wouters477c8d52006-05-27 19:21:47 +0000757 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000758 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000759
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000760class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000761 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000762 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
763 #singles.append("close")
764 #We omit close because it doesn'r raise an exception on some platforms
765 def get_single(f):
766 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000767 if hasattr(os, f):
768 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000769 return helper
770 for f in singles:
771 locals()["test_"+f] = get_single(f)
772
Benjamin Peterson7522c742009-01-19 21:00:09 +0000773 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000774 try:
775 f(support.make_bad_fd(), *args)
776 except OSError as e:
777 self.assertEqual(e.errno, errno.EBADF)
778 else:
779 self.fail("%r didn't raise a OSError with a bad file descriptor"
780 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000781
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000782 def test_isatty(self):
783 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000784 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000785
786 def test_closerange(self):
787 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000788 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000789 # Make sure none of the descriptors we are about to close are
790 # currently valid (issue 6542).
791 for i in range(10):
792 try: os.fstat(fd+i)
793 except OSError:
794 pass
795 else:
796 break
797 if i < 2:
798 raise unittest.SkipTest(
799 "Unable to acquire a range of invalid file descriptors")
800 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000801
802 def test_dup2(self):
803 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000804 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000805
806 def test_fchmod(self):
807 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000808 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000809
810 def test_fchown(self):
811 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000812 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000813
814 def test_fpathconf(self):
815 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000816 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000817
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000818 def test_ftruncate(self):
819 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000820 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000821
822 def test_lseek(self):
823 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000824 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000825
826 def test_read(self):
827 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000828 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000829
830 def test_tcsetpgrpt(self):
831 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000832 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000833
834 def test_write(self):
835 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000836 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000837
Brian Curtin1b9df392010-11-24 20:24:31 +0000838
839class LinkTests(unittest.TestCase):
840 def setUp(self):
841 self.file1 = support.TESTFN
842 self.file2 = os.path.join(support.TESTFN + "2")
843
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000844 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000845 for file in (self.file1, self.file2):
846 if os.path.exists(file):
847 os.unlink(file)
848
Brian Curtin1b9df392010-11-24 20:24:31 +0000849 def _test_link(self, file1, file2):
850 with open(file1, "w") as f1:
851 f1.write("test")
852
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100853 with warnings.catch_warnings():
854 warnings.simplefilter("ignore", DeprecationWarning)
855 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +0000856 with open(file1, "r") as f1, open(file2, "r") as f2:
857 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
858
859 def test_link(self):
860 self._test_link(self.file1, self.file2)
861
862 def test_link_bytes(self):
863 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
864 bytes(self.file2, sys.getfilesystemencoding()))
865
Brian Curtinf498b752010-11-30 15:54:04 +0000866 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000867 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000868 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000869 except UnicodeError:
870 raise unittest.SkipTest("Unable to encode for this platform.")
871
Brian Curtinf498b752010-11-30 15:54:04 +0000872 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000873 self.file2 = self.file1 + "2"
874 self._test_link(self.file1, self.file2)
875
Thomas Wouters477c8d52006-05-27 19:21:47 +0000876if sys.platform != 'win32':
877 class Win32ErrorTests(unittest.TestCase):
878 pass
879
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000880 class PosixUidGidTests(unittest.TestCase):
881 if hasattr(os, 'setuid'):
882 def test_setuid(self):
883 if os.getuid() != 0:
884 self.assertRaises(os.error, os.setuid, 0)
885 self.assertRaises(OverflowError, os.setuid, 1<<32)
886
887 if hasattr(os, 'setgid'):
888 def test_setgid(self):
889 if os.getuid() != 0:
890 self.assertRaises(os.error, os.setgid, 0)
891 self.assertRaises(OverflowError, os.setgid, 1<<32)
892
893 if hasattr(os, 'seteuid'):
894 def test_seteuid(self):
895 if os.getuid() != 0:
896 self.assertRaises(os.error, os.seteuid, 0)
897 self.assertRaises(OverflowError, os.seteuid, 1<<32)
898
899 if hasattr(os, 'setegid'):
900 def test_setegid(self):
901 if os.getuid() != 0:
902 self.assertRaises(os.error, os.setegid, 0)
903 self.assertRaises(OverflowError, os.setegid, 1<<32)
904
905 if hasattr(os, 'setreuid'):
906 def test_setreuid(self):
907 if os.getuid() != 0:
908 self.assertRaises(os.error, os.setreuid, 0, 0)
909 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
910 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000911
912 def test_setreuid_neg1(self):
913 # Needs to accept -1. We run this in a subprocess to avoid
914 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000915 subprocess.check_call([
916 sys.executable, '-c',
917 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000918
919 if hasattr(os, 'setregid'):
920 def test_setregid(self):
921 if os.getuid() != 0:
922 self.assertRaises(os.error, os.setregid, 0, 0)
923 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
924 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000925
926 def test_setregid_neg1(self):
927 # Needs to accept -1. We run this in a subprocess to avoid
928 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000929 subprocess.check_call([
930 sys.executable, '-c',
931 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +0000932
933 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +0000934 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000935 if support.TESTFN_UNENCODABLE:
936 self.dir = support.TESTFN_UNENCODABLE
937 else:
938 self.dir = support.TESTFN
939 self.bdir = os.fsencode(self.dir)
940
941 bytesfn = []
942 def add_filename(fn):
943 try:
944 fn = os.fsencode(fn)
945 except UnicodeEncodeError:
946 return
947 bytesfn.append(fn)
948 add_filename(support.TESTFN_UNICODE)
949 if support.TESTFN_UNENCODABLE:
950 add_filename(support.TESTFN_UNENCODABLE)
951 if not bytesfn:
952 self.skipTest("couldn't create any non-ascii filename")
953
954 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +0000955 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000956 try:
957 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +0200958 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +0000959 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000960 if fn in self.unicodefn:
961 raise ValueError("duplicate filename")
962 self.unicodefn.add(fn)
963 except:
964 shutil.rmtree(self.dir)
965 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +0000966
967 def tearDown(self):
968 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000969
970 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000971 expected = self.unicodefn
972 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000973 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000974
975 def test_open(self):
976 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +0200977 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +0000978 f.close()
979
980 def test_stat(self):
981 for fn in self.unicodefn:
982 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000983else:
984 class PosixUidGidTests(unittest.TestCase):
985 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +0000986 class Pep383Tests(unittest.TestCase):
987 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000988
Brian Curtineb24d742010-04-12 17:16:38 +0000989@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
990class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +0000991 def _kill(self, sig):
992 # Start sys.executable as a subprocess and communicate from the
993 # subprocess to the parent that the interpreter is ready. When it
994 # becomes ready, send *sig* via os.kill to the subprocess and check
995 # that the return code is equal to *sig*.
996 import ctypes
997 from ctypes import wintypes
998 import msvcrt
999
1000 # Since we can't access the contents of the process' stdout until the
1001 # process has exited, use PeekNamedPipe to see what's inside stdout
1002 # without waiting. This is done so we can tell that the interpreter
1003 # is started and running at a point where it could handle a signal.
1004 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1005 PeekNamedPipe.restype = wintypes.BOOL
1006 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1007 ctypes.POINTER(ctypes.c_char), # stdout buf
1008 wintypes.DWORD, # Buffer size
1009 ctypes.POINTER(wintypes.DWORD), # bytes read
1010 ctypes.POINTER(wintypes.DWORD), # bytes avail
1011 ctypes.POINTER(wintypes.DWORD)) # bytes left
1012 msg = "running"
1013 proc = subprocess.Popen([sys.executable, "-c",
1014 "import sys;"
1015 "sys.stdout.write('{}');"
1016 "sys.stdout.flush();"
1017 "input()".format(msg)],
1018 stdout=subprocess.PIPE,
1019 stderr=subprocess.PIPE,
1020 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001021 self.addCleanup(proc.stdout.close)
1022 self.addCleanup(proc.stderr.close)
1023 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001024
1025 count, max = 0, 100
1026 while count < max and proc.poll() is None:
1027 # Create a string buffer to store the result of stdout from the pipe
1028 buf = ctypes.create_string_buffer(len(msg))
1029 # Obtain the text currently in proc.stdout
1030 # Bytes read/avail/left are left as NULL and unused
1031 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1032 buf, ctypes.sizeof(buf), None, None, None)
1033 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1034 if buf.value:
1035 self.assertEqual(msg, buf.value.decode())
1036 break
1037 time.sleep(0.1)
1038 count += 1
1039 else:
1040 self.fail("Did not receive communication from the subprocess")
1041
Brian Curtineb24d742010-04-12 17:16:38 +00001042 os.kill(proc.pid, sig)
1043 self.assertEqual(proc.wait(), sig)
1044
1045 def test_kill_sigterm(self):
1046 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001047 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001048
1049 def test_kill_int(self):
1050 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001051 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001052
1053 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001054 tagname = "test_os_%s" % uuid.uuid1()
1055 m = mmap.mmap(-1, 1, tagname)
1056 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001057 # Run a script which has console control handling enabled.
1058 proc = subprocess.Popen([sys.executable,
1059 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001060 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001061 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1062 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001063 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001064 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001065 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001066 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001067 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001068 count += 1
1069 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001070 # Forcefully kill the process if we weren't able to signal it.
1071 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001072 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001073 os.kill(proc.pid, event)
1074 # proc.send_signal(event) could also be done here.
1075 # Allow time for the signal to be passed and the process to exit.
1076 time.sleep(0.5)
1077 if not proc.poll():
1078 # Forcefully kill the process if we weren't able to signal it.
1079 os.kill(proc.pid, signal.SIGINT)
1080 self.fail("subprocess did not stop on {}".format(name))
1081
1082 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1083 def test_CTRL_C_EVENT(self):
1084 from ctypes import wintypes
1085 import ctypes
1086
1087 # Make a NULL value by creating a pointer with no argument.
1088 NULL = ctypes.POINTER(ctypes.c_int)()
1089 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1090 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1091 wintypes.BOOL)
1092 SetConsoleCtrlHandler.restype = wintypes.BOOL
1093
1094 # Calling this with NULL and FALSE causes the calling process to
1095 # handle CTRL+C, rather than ignore it. This property is inherited
1096 # by subprocesses.
1097 SetConsoleCtrlHandler(NULL, 0)
1098
1099 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1100
1101 def test_CTRL_BREAK_EVENT(self):
1102 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1103
1104
Brian Curtind40e6f72010-07-08 21:39:08 +00001105@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001106@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001107class Win32SymlinkTests(unittest.TestCase):
1108 filelink = 'filelinktest'
1109 filelink_target = os.path.abspath(__file__)
1110 dirlink = 'dirlinktest'
1111 dirlink_target = os.path.dirname(filelink_target)
1112 missing_link = 'missing link'
1113
1114 def setUp(self):
1115 assert os.path.exists(self.dirlink_target)
1116 assert os.path.exists(self.filelink_target)
1117 assert not os.path.exists(self.dirlink)
1118 assert not os.path.exists(self.filelink)
1119 assert not os.path.exists(self.missing_link)
1120
1121 def tearDown(self):
1122 if os.path.exists(self.filelink):
1123 os.remove(self.filelink)
1124 if os.path.exists(self.dirlink):
1125 os.rmdir(self.dirlink)
1126 if os.path.lexists(self.missing_link):
1127 os.remove(self.missing_link)
1128
1129 def test_directory_link(self):
1130 os.symlink(self.dirlink_target, self.dirlink)
1131 self.assertTrue(os.path.exists(self.dirlink))
1132 self.assertTrue(os.path.isdir(self.dirlink))
1133 self.assertTrue(os.path.islink(self.dirlink))
1134 self.check_stat(self.dirlink, self.dirlink_target)
1135
1136 def test_file_link(self):
1137 os.symlink(self.filelink_target, self.filelink)
1138 self.assertTrue(os.path.exists(self.filelink))
1139 self.assertTrue(os.path.isfile(self.filelink))
1140 self.assertTrue(os.path.islink(self.filelink))
1141 self.check_stat(self.filelink, self.filelink_target)
1142
1143 def _create_missing_dir_link(self):
1144 'Create a "directory" link to a non-existent target'
1145 linkname = self.missing_link
1146 if os.path.lexists(linkname):
1147 os.remove(linkname)
1148 target = r'c:\\target does not exist.29r3c740'
1149 assert not os.path.exists(target)
1150 target_is_dir = True
1151 os.symlink(target, linkname, target_is_dir)
1152
1153 def test_remove_directory_link_to_missing_target(self):
1154 self._create_missing_dir_link()
1155 # For compatibility with Unix, os.remove will check the
1156 # directory status and call RemoveDirectory if the symlink
1157 # was created with target_is_dir==True.
1158 os.remove(self.missing_link)
1159
1160 @unittest.skip("currently fails; consider for improvement")
1161 def test_isdir_on_directory_link_to_missing_target(self):
1162 self._create_missing_dir_link()
1163 # consider having isdir return true for directory links
1164 self.assertTrue(os.path.isdir(self.missing_link))
1165
1166 @unittest.skip("currently fails; consider for improvement")
1167 def test_rmdir_on_directory_link_to_missing_target(self):
1168 self._create_missing_dir_link()
1169 # consider allowing rmdir to remove directory links
1170 os.rmdir(self.missing_link)
1171
1172 def check_stat(self, link, target):
1173 self.assertEqual(os.stat(link), os.stat(target))
1174 self.assertNotEqual(os.lstat(link), os.stat(link))
1175
Brian Curtind25aef52011-06-13 15:16:04 -05001176 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001177 with warnings.catch_warnings():
1178 warnings.simplefilter("ignore", DeprecationWarning)
1179 self.assertEqual(os.stat(bytes_link), os.stat(target))
1180 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001181
1182 def test_12084(self):
1183 level1 = os.path.abspath(support.TESTFN)
1184 level2 = os.path.join(level1, "level2")
1185 level3 = os.path.join(level2, "level3")
1186 try:
1187 os.mkdir(level1)
1188 os.mkdir(level2)
1189 os.mkdir(level3)
1190
1191 file1 = os.path.abspath(os.path.join(level1, "file1"))
1192
1193 with open(file1, "w") as f:
1194 f.write("file1")
1195
1196 orig_dir = os.getcwd()
1197 try:
1198 os.chdir(level2)
1199 link = os.path.join(level2, "link")
1200 os.symlink(os.path.relpath(file1), "link")
1201 self.assertIn("link", os.listdir(os.getcwd()))
1202
1203 # Check os.stat calls from the same dir as the link
1204 self.assertEqual(os.stat(file1), os.stat("link"))
1205
1206 # Check os.stat calls from a dir below the link
1207 os.chdir(level1)
1208 self.assertEqual(os.stat(file1),
1209 os.stat(os.path.relpath(link)))
1210
1211 # Check os.stat calls from a dir above the link
1212 os.chdir(level3)
1213 self.assertEqual(os.stat(file1),
1214 os.stat(os.path.relpath(link)))
1215 finally:
1216 os.chdir(orig_dir)
1217 except OSError as err:
1218 self.fail(err)
1219 finally:
1220 os.remove(file1)
1221 shutil.rmtree(level1)
1222
Brian Curtind40e6f72010-07-08 21:39:08 +00001223
Victor Stinnere8d51452010-08-19 01:05:19 +00001224class FSEncodingTests(unittest.TestCase):
1225 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001226 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1227 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001228
Victor Stinnere8d51452010-08-19 01:05:19 +00001229 def test_identity(self):
1230 # assert fsdecode(fsencode(x)) == x
1231 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1232 try:
1233 bytesfn = os.fsencode(fn)
1234 except UnicodeEncodeError:
1235 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001236 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001237
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001238
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001239class PidTests(unittest.TestCase):
1240 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1241 def test_getppid(self):
1242 p = subprocess.Popen([sys.executable, '-c',
1243 'import os; print(os.getppid())'],
1244 stdout=subprocess.PIPE)
1245 stdout, _ = p.communicate()
1246 # We are the parent of our subprocess
1247 self.assertEqual(int(stdout), os.getpid())
1248
1249
Brian Curtin0151b8e2010-09-24 13:43:43 +00001250# The introduction of this TestCase caused at least two different errors on
1251# *nix buildbots. Temporarily skip this to let the buildbots move along.
1252@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001253@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1254class LoginTests(unittest.TestCase):
1255 def test_getlogin(self):
1256 user_name = os.getlogin()
1257 self.assertNotEqual(len(user_name), 0)
1258
1259
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001260@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1261 "needs os.getpriority and os.setpriority")
1262class ProgramPriorityTests(unittest.TestCase):
1263 """Tests for os.getpriority() and os.setpriority()."""
1264
1265 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001266
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001267 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1268 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1269 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001270 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1271 if base >= 19 and new_prio <= 19:
1272 raise unittest.SkipTest(
1273 "unable to reliably test setpriority at current nice level of %s" % base)
1274 else:
1275 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001276 finally:
1277 try:
1278 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1279 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001280 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001281 raise
1282
1283
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001284if threading is not None:
1285 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001286
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001287 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001288
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001289 def __init__(self, conn):
1290 asynchat.async_chat.__init__(self, conn)
1291 self.in_buffer = []
1292 self.closed = False
1293 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001294
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001295 def handle_read(self):
1296 data = self.recv(4096)
1297 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001298
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001299 def get_data(self):
1300 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001301
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001302 def handle_close(self):
1303 self.close()
1304 self.closed = True
1305
1306 def handle_error(self):
1307 raise
1308
1309 def __init__(self, address):
1310 threading.Thread.__init__(self)
1311 asyncore.dispatcher.__init__(self)
1312 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1313 self.bind(address)
1314 self.listen(5)
1315 self.host, self.port = self.socket.getsockname()[:2]
1316 self.handler_instance = None
1317 self._active = False
1318 self._active_lock = threading.Lock()
1319
1320 # --- public API
1321
1322 @property
1323 def running(self):
1324 return self._active
1325
1326 def start(self):
1327 assert not self.running
1328 self.__flag = threading.Event()
1329 threading.Thread.start(self)
1330 self.__flag.wait()
1331
1332 def stop(self):
1333 assert self.running
1334 self._active = False
1335 self.join()
1336
1337 def wait(self):
1338 # wait for handler connection to be closed, then stop the server
1339 while not getattr(self.handler_instance, "closed", False):
1340 time.sleep(0.001)
1341 self.stop()
1342
1343 # --- internals
1344
1345 def run(self):
1346 self._active = True
1347 self.__flag.set()
1348 while self._active and asyncore.socket_map:
1349 self._active_lock.acquire()
1350 asyncore.loop(timeout=0.001, count=1)
1351 self._active_lock.release()
1352 asyncore.close_all()
1353
1354 def handle_accept(self):
1355 conn, addr = self.accept()
1356 self.handler_instance = self.Handler(conn)
1357
1358 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001359 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001360 handle_read = handle_connect
1361
1362 def writable(self):
1363 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001364
1365 def handle_error(self):
1366 raise
1367
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001368
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001369@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001370@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1371class TestSendfile(unittest.TestCase):
1372
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001373 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001374 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001375 not sys.platform.startswith("solaris") and \
1376 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001377
1378 @classmethod
1379 def setUpClass(cls):
1380 with open(support.TESTFN, "wb") as f:
1381 f.write(cls.DATA)
1382
1383 @classmethod
1384 def tearDownClass(cls):
1385 support.unlink(support.TESTFN)
1386
1387 def setUp(self):
1388 self.server = SendfileTestServer((support.HOST, 0))
1389 self.server.start()
1390 self.client = socket.socket()
1391 self.client.connect((self.server.host, self.server.port))
1392 self.client.settimeout(1)
1393 # synchronize by waiting for "220 ready" response
1394 self.client.recv(1024)
1395 self.sockno = self.client.fileno()
1396 self.file = open(support.TESTFN, 'rb')
1397 self.fileno = self.file.fileno()
1398
1399 def tearDown(self):
1400 self.file.close()
1401 self.client.close()
1402 if self.server.running:
1403 self.server.stop()
1404
1405 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1406 """A higher level wrapper representing how an application is
1407 supposed to use sendfile().
1408 """
1409 while 1:
1410 try:
1411 if self.SUPPORT_HEADERS_TRAILERS:
1412 return os.sendfile(sock, file, offset, nbytes, headers,
1413 trailers)
1414 else:
1415 return os.sendfile(sock, file, offset, nbytes)
1416 except OSError as err:
1417 if err.errno == errno.ECONNRESET:
1418 # disconnected
1419 raise
1420 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1421 # we have to retry send data
1422 continue
1423 else:
1424 raise
1425
1426 def test_send_whole_file(self):
1427 # normal send
1428 total_sent = 0
1429 offset = 0
1430 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001431 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001432 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1433 if sent == 0:
1434 break
1435 offset += sent
1436 total_sent += sent
1437 self.assertTrue(sent <= nbytes)
1438 self.assertEqual(offset, total_sent)
1439
1440 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001441 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001442 self.client.close()
1443 self.server.wait()
1444 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001445 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001446 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001447
1448 def test_send_at_certain_offset(self):
1449 # start sending a file at a certain offset
1450 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001451 offset = len(self.DATA) // 2
1452 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001453 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001454 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001455 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1456 if sent == 0:
1457 break
1458 offset += sent
1459 total_sent += sent
1460 self.assertTrue(sent <= nbytes)
1461
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001462 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001463 self.client.close()
1464 self.server.wait()
1465 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001466 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001467 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001468 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001469 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001470
1471 def test_offset_overflow(self):
1472 # specify an offset > file size
1473 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001474 try:
1475 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1476 except OSError as e:
1477 # Solaris can raise EINVAL if offset >= file length, ignore.
1478 if e.errno != errno.EINVAL:
1479 raise
1480 else:
1481 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001482 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001483 self.client.close()
1484 self.server.wait()
1485 data = self.server.handler_instance.get_data()
1486 self.assertEqual(data, b'')
1487
1488 def test_invalid_offset(self):
1489 with self.assertRaises(OSError) as cm:
1490 os.sendfile(self.sockno, self.fileno, -1, 4096)
1491 self.assertEqual(cm.exception.errno, errno.EINVAL)
1492
1493 # --- headers / trailers tests
1494
1495 if SUPPORT_HEADERS_TRAILERS:
1496
1497 def test_headers(self):
1498 total_sent = 0
1499 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1500 headers=[b"x" * 512])
1501 total_sent += sent
1502 offset = 4096
1503 nbytes = 4096
1504 while 1:
1505 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1506 offset, nbytes)
1507 if sent == 0:
1508 break
1509 total_sent += sent
1510 offset += sent
1511
1512 expected_data = b"x" * 512 + self.DATA
1513 self.assertEqual(total_sent, len(expected_data))
1514 self.client.close()
1515 self.server.wait()
1516 data = self.server.handler_instance.get_data()
1517 self.assertEqual(hash(data), hash(expected_data))
1518
1519 def test_trailers(self):
1520 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001521 with open(TESTFN2, 'wb') as f:
1522 f.write(b"abcde")
1523 with open(TESTFN2, 'rb')as f:
1524 self.addCleanup(os.remove, TESTFN2)
1525 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1526 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001527 self.client.close()
1528 self.server.wait()
1529 data = self.server.handler_instance.get_data()
1530 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001531
1532 if hasattr(os, "SF_NODISKIO"):
1533 def test_flags(self):
1534 try:
1535 os.sendfile(self.sockno, self.fileno, 0, 4096,
1536 flags=os.SF_NODISKIO)
1537 except OSError as err:
1538 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1539 raise
1540
1541
Benjamin Peterson799bd802011-08-31 22:15:17 -04001542def supports_extended_attributes():
1543 if not hasattr(os, "setxattr"):
1544 return False
1545 try:
1546 with open(support.TESTFN, "wb") as fp:
1547 try:
1548 os.fsetxattr(fp.fileno(), b"user.test", b"")
1549 except OSError as e:
1550 if e.errno != errno.ENOTSUP:
1551 raise
1552 return False
1553 finally:
1554 support.unlink(support.TESTFN)
1555 # Kernels < 2.6.39 don't respect setxattr flags.
1556 kernel_version = platform.release()
1557 m = re.match("2.6.(\d{1,2})", kernel_version)
1558 return m is None or int(m.group(1)) >= 39
1559
1560
1561@unittest.skipUnless(supports_extended_attributes(),
1562 "no non-broken extended attribute support")
1563class ExtendedAttributeTests(unittest.TestCase):
1564
1565 def tearDown(self):
1566 support.unlink(support.TESTFN)
1567
1568 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1569 fn = support.TESTFN
1570 open(fn, "wb").close()
1571 with self.assertRaises(OSError) as cm:
1572 getxattr(fn, s("user.test"))
1573 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001574 init_xattr = listxattr(fn)
1575 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001576 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001577 xattr = set(init_xattr)
1578 xattr.add("user.test")
1579 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001580 self.assertEqual(getxattr(fn, b"user.test"), b"")
1581 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1582 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1583 with self.assertRaises(OSError) as cm:
1584 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1585 self.assertEqual(cm.exception.errno, errno.EEXIST)
1586 with self.assertRaises(OSError) as cm:
1587 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1588 self.assertEqual(cm.exception.errno, errno.ENODATA)
1589 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001590 xattr.add("user.test2")
1591 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001592 removexattr(fn, s("user.test"))
1593 with self.assertRaises(OSError) as cm:
1594 getxattr(fn, s("user.test"))
1595 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001596 xattr.remove("user.test")
1597 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001598 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1599 setxattr(fn, s("user.test"), b"a"*1024)
1600 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1601 removexattr(fn, s("user.test"))
1602 many = sorted("user.test{}".format(i) for i in range(100))
1603 for thing in many:
1604 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001605 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001606
1607 def _check_xattrs(self, *args):
1608 def make_bytes(s):
1609 return bytes(s, "ascii")
1610 self._check_xattrs_str(str, *args)
1611 support.unlink(support.TESTFN)
1612 self._check_xattrs_str(make_bytes, *args)
1613
1614 def test_simple(self):
1615 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1616 os.listxattr)
1617
1618 def test_lpath(self):
1619 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1620 os.llistxattr)
1621
1622 def test_fds(self):
1623 def getxattr(path, *args):
1624 with open(path, "rb") as fp:
1625 return os.fgetxattr(fp.fileno(), *args)
1626 def setxattr(path, *args):
1627 with open(path, "wb") as fp:
1628 os.fsetxattr(fp.fileno(), *args)
1629 def removexattr(path, *args):
1630 with open(path, "wb") as fp:
1631 os.fremovexattr(fp.fileno(), *args)
1632 def listxattr(path, *args):
1633 with open(path, "rb") as fp:
1634 return os.flistxattr(fp.fileno(), *args)
1635 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1636
1637
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001638@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1639class Win32DeprecatedBytesAPI(unittest.TestCase):
1640 def test_deprecated(self):
1641 import nt
1642 filename = os.fsencode(support.TESTFN)
1643 with warnings.catch_warnings():
1644 warnings.simplefilter("error", DeprecationWarning)
1645 for func, *args in (
1646 (nt._getfullpathname, filename),
1647 (nt._isdir, filename),
1648 (os.access, filename, os.R_OK),
1649 (os.chdir, filename),
1650 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001651 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001652 (os.link, filename, filename),
1653 (os.listdir, filename),
1654 (os.lstat, filename),
1655 (os.mkdir, filename),
1656 (os.open, filename, os.O_RDONLY),
1657 (os.rename, filename, filename),
1658 (os.rmdir, filename),
1659 (os.startfile, filename),
1660 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001661 (os.unlink, filename),
1662 (os.utime, filename),
1663 ):
1664 self.assertRaises(DeprecationWarning, func, *args)
1665
Victor Stinner28216442011-11-16 00:34:44 +01001666 @support.skip_unless_symlink
1667 def test_symlink(self):
1668 filename = os.fsencode(support.TESTFN)
1669 with warnings.catch_warnings():
1670 warnings.simplefilter("error", DeprecationWarning)
1671 self.assertRaises(DeprecationWarning,
1672 os.symlink, filename, filename)
1673
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001674
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001675@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001676def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001677 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001678 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001679 StatAttributeTests,
1680 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001681 WalkTests,
1682 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001683 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001684 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001685 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001686 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001687 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001688 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001689 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001690 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001691 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001692 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001693 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001694 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001695 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001696 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001697 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001698 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001699 Win32DeprecatedBytesAPI,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001700 )
Fred Drake2e2be372001-09-20 21:33:42 +00001701
1702if __name__ == "__main__":
1703 test_main()