blob: da143cf3c073408a280a4f271d4120ea6edef434 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
23try:
24 import threading
25except ImportError:
26 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000027
Mark Dickinson7cf03892010-04-16 13:45:35 +000028# Detect whether we're on a Linux system that uses the (now outdated
29# and unmaintained) linuxthreads threading library. There's an issue
30# when combining linuxthreads with a failed execv call: see
31# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020032if hasattr(sys, 'thread_info') and sys.thread_info.version:
33 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
34else:
35 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000036
Thomas Wouters0e3f5912006-08-11 14:57:12 +000037# Tests creating TESTFN
38class FileTests(unittest.TestCase):
39 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040 if os.path.exists(support.TESTFN):
41 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000042 tearDown = setUp
43
44 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000045 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000047 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048
Christian Heimesfdab48e2008-01-20 09:06:41 +000049 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000050 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
51 # We must allocate two consecutive file descriptors, otherwise
52 # it will mess up other file descriptors (perhaps even the three
53 # standard ones).
54 second = os.dup(first)
55 try:
56 retries = 0
57 while second != first + 1:
58 os.close(first)
59 retries += 1
60 if retries > 10:
61 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000062 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first, second = second, os.dup(second)
64 finally:
65 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000066 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000067 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000068 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000070 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000071 def test_rename(self):
72 path = support.TESTFN
73 old = sys.getrefcount(path)
74 self.assertRaises(TypeError, os.rename, path, 0)
75 new = sys.getrefcount(path)
76 self.assertEqual(old, new)
77
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000078 def test_read(self):
79 with open(support.TESTFN, "w+b") as fobj:
80 fobj.write(b"spam")
81 fobj.flush()
82 fd = fobj.fileno()
83 os.lseek(fd, 0, 0)
84 s = os.read(fd, 4)
85 self.assertEqual(type(s), bytes)
86 self.assertEqual(s, b"spam")
87
88 def test_write(self):
89 # os.write() accepts bytes- and buffer-like objects but not strings
90 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
91 self.assertRaises(TypeError, os.write, fd, "beans")
92 os.write(fd, b"bacon\n")
93 os.write(fd, bytearray(b"eggs\n"))
94 os.write(fd, memoryview(b"spam\n"))
95 os.close(fd)
96 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000097 self.assertEqual(fobj.read().splitlines(),
98 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099
Victor Stinnere0daff12011-03-20 23:36:35 +0100100 def write_windows_console(self, *args):
101 retcode = subprocess.call(args,
102 # use a new console to not flood the test output
103 creationflags=subprocess.CREATE_NEW_CONSOLE,
104 # use a shell to hide the console window (SW_HIDE)
105 shell=True)
106 self.assertEqual(retcode, 0)
107
108 @unittest.skipUnless(sys.platform == 'win32',
109 'test specific to the Windows console')
110 def test_write_windows_console(self):
111 # Issue #11395: the Windows console returns an error (12: not enough
112 # space error) on writing into stdout if stdout mode is binary and the
113 # length is greater than 66,000 bytes (or less, depending on heap
114 # usage).
115 code = "print('x' * 100000)"
116 self.write_windows_console(sys.executable, "-c", code)
117 self.write_windows_console(sys.executable, "-u", "-c", code)
118
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000119 def fdopen_helper(self, *args):
120 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200121 f = os.fdopen(fd, *args)
122 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000123
124 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200125 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
126 os.close(fd)
127
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000128 self.fdopen_helper()
129 self.fdopen_helper('r')
130 self.fdopen_helper('r', 100)
131
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200132
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000133# Test attributes on return values from os.*stat* family.
134class StatAttributeTests(unittest.TestCase):
135 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000136 os.mkdir(support.TESTFN)
137 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000138 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000139 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000140 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000141
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000142 def tearDown(self):
143 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000144 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000145
Antoine Pitrou38425292010-09-21 18:19:07 +0000146 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000147 if not hasattr(os, "stat"):
148 return
149
150 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000151 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000152
153 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000154 self.assertEqual(result[stat.ST_SIZE], 3)
155 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000156
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000157 # Make sure all the attributes are there
158 members = dir(result)
159 for name in dir(stat):
160 if name[:3] == 'ST_':
161 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000162 if name.endswith("TIME"):
163 def trunc(x): return int(x)
164 else:
165 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000166 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000168 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169
170 try:
171 result[200]
172 self.fail("No exception thrown")
173 except IndexError:
174 pass
175
176 # Make sure that assignment fails
177 try:
178 result.st_mode = 1
179 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000180 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 pass
182
183 try:
184 result.st_rdev = 1
185 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000186 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000187 pass
188
189 try:
190 result.parrot = 1
191 self.fail("No exception thrown")
192 except AttributeError:
193 pass
194
195 # Use the stat_result constructor with a too-short tuple.
196 try:
197 result2 = os.stat_result((10,))
198 self.fail("No exception thrown")
199 except TypeError:
200 pass
201
Ezio Melotti42da6632011-03-15 05:18:48 +0200202 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203 try:
204 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
205 except TypeError:
206 pass
207
Antoine Pitrou38425292010-09-21 18:19:07 +0000208 def test_stat_attributes(self):
209 self.check_stat_attributes(self.fname)
210
211 def test_stat_attributes_bytes(self):
212 try:
213 fname = self.fname.encode(sys.getfilesystemencoding())
214 except UnicodeEncodeError:
215 self.skipTest("cannot encode %a for the filesystem" % self.fname)
216 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000217
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000218 def test_statvfs_attributes(self):
219 if not hasattr(os, "statvfs"):
220 return
221
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000222 try:
223 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000224 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000225 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000226 if e.errno == errno.ENOSYS:
227 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000228
229 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000230 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000231
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000232 # Make sure all the attributes are there.
233 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
234 'ffree', 'favail', 'flag', 'namemax')
235 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000236 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000237
238 # Make sure that assignment really fails
239 try:
240 result.f_bfree = 1
241 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000242 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000243 pass
244
245 try:
246 result.parrot = 1
247 self.fail("No exception thrown")
248 except AttributeError:
249 pass
250
251 # Use the constructor with a too-short tuple.
252 try:
253 result2 = os.statvfs_result((10,))
254 self.fail("No exception thrown")
255 except TypeError:
256 pass
257
Ezio Melotti42da6632011-03-15 05:18:48 +0200258 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259 try:
260 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
261 except TypeError:
262 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000263
Thomas Wouters89f507f2006-12-13 04:49:30 +0000264 def test_utime_dir(self):
265 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000266 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000267 # round to int, because some systems may support sub-second
268 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000269 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
270 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000271 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000272
Brian Curtin52fbea12011-11-06 13:41:17 -0600273 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600274 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600275 # second argument. Check that the previous methods of passing
276 # a time tuple or None work in addition to no argument.
277 st = os.stat(support.TESTFN)
278 # Doesn't set anything new, but sets the time tuple way
279 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
280 # Set to the current time in the old explicit way.
281 os.utime(support.TESTFN, None)
282 st1 = os.stat(support.TESTFN)
283 # Set to the current time in the new way
284 os.utime(support.TESTFN)
285 st2 = os.stat(support.TESTFN)
286 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
287
Thomas Wouters89f507f2006-12-13 04:49:30 +0000288 # Restrict test to Win32, since there is no guarantee other
289 # systems support centiseconds
290 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000291 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000292 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000293 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000294 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000295 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000296 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000297 return buf.value
298
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000299 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000300 def test_1565150(self):
301 t1 = 1159195039.25
302 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000303 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000304
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000305 def test_large_time(self):
306 t1 = 5000000000 # some day in 2128
307 os.utime(self.fname, (t1, t1))
308 self.assertEqual(os.stat(self.fname).st_mtime, t1)
309
Guido van Rossumd8faa362007-04-27 19:54:29 +0000310 def test_1686475(self):
311 # Verify that an open file can be stat'ed
312 try:
313 os.stat(r"c:\pagefile.sys")
314 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000315 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000316 return
317 self.fail("Could not stat pagefile.sys")
318
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000319from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000320
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000321class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000322 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000323 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000324
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000325 def setUp(self):
326 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000327 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000328 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000329 for key, value in self._reference().items():
330 os.environ[key] = value
331
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000332 def tearDown(self):
333 os.environ.clear()
334 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000335 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000336 os.environb.clear()
337 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000338
Christian Heimes90333392007-11-01 19:08:42 +0000339 def _reference(self):
340 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
341
342 def _empty_mapping(self):
343 os.environ.clear()
344 return os.environ
345
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000346 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000347 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000348 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000349 if os.path.exists("/bin/sh"):
350 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000351 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
352 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000353 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000354
Christian Heimes1a13d592007-11-08 14:16:55 +0000355 def test_os_popen_iter(self):
356 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000357 with os.popen(
358 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
359 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000360 self.assertEqual(next(it), "line1\n")
361 self.assertEqual(next(it), "line2\n")
362 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000363 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000364
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000365 # Verify environ keys and values from the OS are of the
366 # correct str type.
367 def test_keyvalue_types(self):
368 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000369 self.assertEqual(type(key), str)
370 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000371
Christian Heimes90333392007-11-01 19:08:42 +0000372 def test_items(self):
373 for key, value in self._reference().items():
374 self.assertEqual(os.environ.get(key), value)
375
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000376 # Issue 7310
377 def test___repr__(self):
378 """Check that the repr() of os.environ looks like environ({...})."""
379 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000380 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
381 '{!r}: {!r}'.format(key, value)
382 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000383
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000384 def test_get_exec_path(self):
385 defpath_list = os.defpath.split(os.pathsep)
386 test_path = ['/monty', '/python', '', '/flying/circus']
387 test_env = {'PATH': os.pathsep.join(test_path)}
388
389 saved_environ = os.environ
390 try:
391 os.environ = dict(test_env)
392 # Test that defaulting to os.environ works.
393 self.assertSequenceEqual(test_path, os.get_exec_path())
394 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
395 finally:
396 os.environ = saved_environ
397
398 # No PATH environment variable
399 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
400 # Empty PATH environment variable
401 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
402 # Supplied PATH environment variable
403 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
404
Victor Stinnerb745a742010-05-18 17:17:23 +0000405 if os.supports_bytes_environ:
406 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000407 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000408 # ignore BytesWarning warning
409 with warnings.catch_warnings(record=True):
410 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000411 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000412 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000413 pass
414 else:
415 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000416
417 # bytes key and/or value
418 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
419 ['abc'])
420 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
421 ['abc'])
422 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
423 ['abc'])
424
425 @unittest.skipUnless(os.supports_bytes_environ,
426 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000427 def test_environb(self):
428 # os.environ -> os.environb
429 value = 'euro\u20ac'
430 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000431 value_bytes = value.encode(sys.getfilesystemencoding(),
432 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000433 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000434 msg = "U+20AC character is not encodable to %s" % (
435 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000436 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000437 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000438 self.assertEqual(os.environ['unicode'], value)
439 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000440
441 # os.environb -> os.environ
442 value = b'\xff'
443 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000444 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000445 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000446 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000447
Tim Petersc4e09402003-04-25 07:11:48 +0000448class WalkTests(unittest.TestCase):
449 """Tests for os.walk()."""
450
451 def test_traversal(self):
452 import os
453 from os.path import join
454
455 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000456 # TESTFN/
457 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000458 # tmp1
459 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000460 # tmp2
461 # SUB11/ no kids
462 # SUB2/ a file kid and a dirsymlink kid
463 # tmp3
464 # link/ a symlink to TESTFN.2
465 # TEST2/
466 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000467 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000468 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000469 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000470 sub2_path = join(walk_path, "SUB2")
471 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000472 tmp2_path = join(sub1_path, "tmp2")
473 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000474 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000475 t2_path = join(support.TESTFN, "TEST2")
476 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000477
478 # Create stuff.
479 os.makedirs(sub11_path)
480 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000481 os.makedirs(t2_path)
482 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000483 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000484 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
485 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000486 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000487 os.symlink(os.path.abspath(t2_path), link_path)
488 sub2_tree = (sub2_path, ["link"], ["tmp3"])
489 else:
490 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000491
492 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000493 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000494 self.assertEqual(len(all), 4)
495 # We can't know which order SUB1 and SUB2 will appear in.
496 # Not flipped: TESTFN, SUB1, SUB11, SUB2
497 # flipped: TESTFN, SUB2, SUB1, SUB11
498 flipped = all[0][1][0] != "SUB1"
499 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000500 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000501 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
502 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000503 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000504
505 # Prune the search.
506 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000507 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000508 all.append((root, dirs, files))
509 # Don't descend into SUB1.
510 if 'SUB1' in dirs:
511 # Note that this also mutates the dirs we appended to all!
512 dirs.remove('SUB1')
513 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000514 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
515 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000516
517 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000518 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000519 self.assertEqual(len(all), 4)
520 # We can't know which order SUB1 and SUB2 will appear in.
521 # Not flipped: SUB11, SUB1, SUB2, TESTFN
522 # flipped: SUB2, SUB11, SUB1, TESTFN
523 flipped = all[3][1][0] != "SUB1"
524 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000525 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000526 self.assertEqual(all[flipped], (sub11_path, [], []))
527 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000528 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000529
Brian Curtin3b4499c2010-12-28 14:31:47 +0000530 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000531 # Walk, following symlinks.
532 for root, dirs, files in os.walk(walk_path, followlinks=True):
533 if root == link_path:
534 self.assertEqual(dirs, [])
535 self.assertEqual(files, ["tmp4"])
536 break
537 else:
538 self.fail("Didn't follow symlink with followlinks=True")
539
540 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000541 # Tear everything down. This is a decent use for bottom-up on
542 # Windows, which doesn't have a recursive delete command. The
543 # (not so) subtlety is that rmdir will fail unless the dir's
544 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000545 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000546 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000547 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000548 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000549 dirname = os.path.join(root, name)
550 if not os.path.islink(dirname):
551 os.rmdir(dirname)
552 else:
553 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000554 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000555
Guido van Rossume7ba4952007-06-06 23:52:48 +0000556class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000557 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000558 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000559
560 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000561 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000562 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
563 os.makedirs(path) # Should work
564 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
565 os.makedirs(path)
566
567 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000568 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000569 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
570 os.makedirs(path)
571 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
572 'dir5', 'dir6')
573 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000574
Terry Reedy5a22b652010-12-02 07:05:56 +0000575 def test_exist_ok_existing_directory(self):
576 path = os.path.join(support.TESTFN, 'dir1')
577 mode = 0o777
578 old_mask = os.umask(0o022)
579 os.makedirs(path, mode)
580 self.assertRaises(OSError, os.makedirs, path, mode)
581 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
582 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
583 os.makedirs(path, mode=mode, exist_ok=True)
584 os.umask(old_mask)
585
586 def test_exist_ok_existing_regular_file(self):
587 base = support.TESTFN
588 path = os.path.join(support.TESTFN, 'dir1')
589 f = open(path, 'w')
590 f.write('abc')
591 f.close()
592 self.assertRaises(OSError, os.makedirs, path)
593 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
594 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
595 os.remove(path)
596
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000597 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000598 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000599 'dir4', 'dir5', 'dir6')
600 # If the tests failed, the bottom-most directory ('../dir6')
601 # may not have been created, so we look for the outermost directory
602 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000603 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000604 path = os.path.dirname(path)
605
606 os.removedirs(path)
607
Guido van Rossume7ba4952007-06-06 23:52:48 +0000608class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000609 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200610 with open(os.devnull, 'wb') as f:
611 f.write(b'hello')
612 f.close()
613 with open(os.devnull, 'rb') as f:
614 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000615
Guido van Rossume7ba4952007-06-06 23:52:48 +0000616class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000617 def test_urandom(self):
618 try:
619 self.assertEqual(len(os.urandom(1)), 1)
620 self.assertEqual(len(os.urandom(10)), 10)
621 self.assertEqual(len(os.urandom(100)), 100)
622 self.assertEqual(len(os.urandom(1000)), 1000)
623 except NotImplementedError:
624 pass
625
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000626@contextlib.contextmanager
627def _execvpe_mockup(defpath=None):
628 """
629 Stubs out execv and execve functions when used as context manager.
630 Records exec calls. The mock execv and execve functions always raise an
631 exception as they would normally never return.
632 """
633 # A list of tuples containing (function name, first arg, args)
634 # of calls to execv or execve that have been made.
635 calls = []
636
637 def mock_execv(name, *args):
638 calls.append(('execv', name, args))
639 raise RuntimeError("execv called")
640
641 def mock_execve(name, *args):
642 calls.append(('execve', name, args))
643 raise OSError(errno.ENOTDIR, "execve called")
644
645 try:
646 orig_execv = os.execv
647 orig_execve = os.execve
648 orig_defpath = os.defpath
649 os.execv = mock_execv
650 os.execve = mock_execve
651 if defpath is not None:
652 os.defpath = defpath
653 yield calls
654 finally:
655 os.execv = orig_execv
656 os.execve = orig_execve
657 os.defpath = orig_defpath
658
Guido van Rossume7ba4952007-06-06 23:52:48 +0000659class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000660 @unittest.skipIf(USING_LINUXTHREADS,
661 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000662 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000663 self.assertRaises(OSError, os.execvpe, 'no such app-',
664 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000665
Thomas Heller6790d602007-08-30 17:15:14 +0000666 def test_execvpe_with_bad_arglist(self):
667 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
668
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000669 @unittest.skipUnless(hasattr(os, '_execvpe'),
670 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000671 def _test_internal_execvpe(self, test_type):
672 program_path = os.sep + 'absolutepath'
673 if test_type is bytes:
674 program = b'executable'
675 fullpath = os.path.join(os.fsencode(program_path), program)
676 native_fullpath = fullpath
677 arguments = [b'progname', 'arg1', 'arg2']
678 else:
679 program = 'executable'
680 arguments = ['progname', 'arg1', 'arg2']
681 fullpath = os.path.join(program_path, program)
682 if os.name != "nt":
683 native_fullpath = os.fsencode(fullpath)
684 else:
685 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000686 env = {'spam': 'beans'}
687
Victor Stinnerb745a742010-05-18 17:17:23 +0000688 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000689 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000690 self.assertRaises(RuntimeError,
691 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000692 self.assertEqual(len(calls), 1)
693 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
694
Victor Stinnerb745a742010-05-18 17:17:23 +0000695 # test os._execvpe() with a relative path:
696 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000697 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000698 self.assertRaises(OSError,
699 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000700 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000701 self.assertSequenceEqual(calls[0],
702 ('execve', native_fullpath, (arguments, env)))
703
704 # test os._execvpe() with a relative path:
705 # os.get_exec_path() reads the 'PATH' variable
706 with _execvpe_mockup() as calls:
707 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000708 if test_type is bytes:
709 env_path[b'PATH'] = program_path
710 else:
711 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000712 self.assertRaises(OSError,
713 os._execvpe, program, arguments, env=env_path)
714 self.assertEqual(len(calls), 1)
715 self.assertSequenceEqual(calls[0],
716 ('execve', native_fullpath, (arguments, env_path)))
717
718 def test_internal_execvpe_str(self):
719 self._test_internal_execvpe(str)
720 if os.name != "nt":
721 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000722
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000723
Thomas Wouters477c8d52006-05-27 19:21:47 +0000724class Win32ErrorTests(unittest.TestCase):
725 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000726 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000727
728 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000729 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000730
731 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000732 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000733
734 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000735 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000736 try:
737 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
738 finally:
739 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000740 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000741
742 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000743 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000744
Thomas Wouters477c8d52006-05-27 19:21:47 +0000745 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000746 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000747
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000748class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000749 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000750 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
751 #singles.append("close")
752 #We omit close because it doesn'r raise an exception on some platforms
753 def get_single(f):
754 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000755 if hasattr(os, f):
756 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000757 return helper
758 for f in singles:
759 locals()["test_"+f] = get_single(f)
760
Benjamin Peterson7522c742009-01-19 21:00:09 +0000761 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000762 try:
763 f(support.make_bad_fd(), *args)
764 except OSError as e:
765 self.assertEqual(e.errno, errno.EBADF)
766 else:
767 self.fail("%r didn't raise a OSError with a bad file descriptor"
768 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000769
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000770 def test_isatty(self):
771 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000772 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000773
774 def test_closerange(self):
775 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000776 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000777 # Make sure none of the descriptors we are about to close are
778 # currently valid (issue 6542).
779 for i in range(10):
780 try: os.fstat(fd+i)
781 except OSError:
782 pass
783 else:
784 break
785 if i < 2:
786 raise unittest.SkipTest(
787 "Unable to acquire a range of invalid file descriptors")
788 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000789
790 def test_dup2(self):
791 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000792 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000793
794 def test_fchmod(self):
795 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000796 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000797
798 def test_fchown(self):
799 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000800 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000801
802 def test_fpathconf(self):
803 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000804 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000805
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000806 def test_ftruncate(self):
807 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000808 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000809
810 def test_lseek(self):
811 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000812 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000813
814 def test_read(self):
815 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000816 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000817
818 def test_tcsetpgrpt(self):
819 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000820 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000821
822 def test_write(self):
823 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000824 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000825
Brian Curtin1b9df392010-11-24 20:24:31 +0000826
827class LinkTests(unittest.TestCase):
828 def setUp(self):
829 self.file1 = support.TESTFN
830 self.file2 = os.path.join(support.TESTFN + "2")
831
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000832 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000833 for file in (self.file1, self.file2):
834 if os.path.exists(file):
835 os.unlink(file)
836
Brian Curtin1b9df392010-11-24 20:24:31 +0000837 def _test_link(self, file1, file2):
838 with open(file1, "w") as f1:
839 f1.write("test")
840
841 os.link(file1, file2)
842 with open(file1, "r") as f1, open(file2, "r") as f2:
843 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
844
845 def test_link(self):
846 self._test_link(self.file1, self.file2)
847
848 def test_link_bytes(self):
849 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
850 bytes(self.file2, sys.getfilesystemencoding()))
851
Brian Curtinf498b752010-11-30 15:54:04 +0000852 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000853 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000854 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000855 except UnicodeError:
856 raise unittest.SkipTest("Unable to encode for this platform.")
857
Brian Curtinf498b752010-11-30 15:54:04 +0000858 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000859 self.file2 = self.file1 + "2"
860 self._test_link(self.file1, self.file2)
861
Thomas Wouters477c8d52006-05-27 19:21:47 +0000862if sys.platform != 'win32':
863 class Win32ErrorTests(unittest.TestCase):
864 pass
865
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000866 class PosixUidGidTests(unittest.TestCase):
867 if hasattr(os, 'setuid'):
868 def test_setuid(self):
869 if os.getuid() != 0:
870 self.assertRaises(os.error, os.setuid, 0)
871 self.assertRaises(OverflowError, os.setuid, 1<<32)
872
873 if hasattr(os, 'setgid'):
874 def test_setgid(self):
875 if os.getuid() != 0:
876 self.assertRaises(os.error, os.setgid, 0)
877 self.assertRaises(OverflowError, os.setgid, 1<<32)
878
879 if hasattr(os, 'seteuid'):
880 def test_seteuid(self):
881 if os.getuid() != 0:
882 self.assertRaises(os.error, os.seteuid, 0)
883 self.assertRaises(OverflowError, os.seteuid, 1<<32)
884
885 if hasattr(os, 'setegid'):
886 def test_setegid(self):
887 if os.getuid() != 0:
888 self.assertRaises(os.error, os.setegid, 0)
889 self.assertRaises(OverflowError, os.setegid, 1<<32)
890
891 if hasattr(os, 'setreuid'):
892 def test_setreuid(self):
893 if os.getuid() != 0:
894 self.assertRaises(os.error, os.setreuid, 0, 0)
895 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
896 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000897
898 def test_setreuid_neg1(self):
899 # Needs to accept -1. We run this in a subprocess to avoid
900 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000901 subprocess.check_call([
902 sys.executable, '-c',
903 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000904
905 if hasattr(os, 'setregid'):
906 def test_setregid(self):
907 if os.getuid() != 0:
908 self.assertRaises(os.error, os.setregid, 0, 0)
909 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
910 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000911
912 def test_setregid_neg1(self):
913 # Needs to accept -1. We run this in a subprocess to avoid
914 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000915 subprocess.check_call([
916 sys.executable, '-c',
917 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +0000918
919 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +0000920 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000921 if support.TESTFN_UNENCODABLE:
922 self.dir = support.TESTFN_UNENCODABLE
923 else:
924 self.dir = support.TESTFN
925 self.bdir = os.fsencode(self.dir)
926
927 bytesfn = []
928 def add_filename(fn):
929 try:
930 fn = os.fsencode(fn)
931 except UnicodeEncodeError:
932 return
933 bytesfn.append(fn)
934 add_filename(support.TESTFN_UNICODE)
935 if support.TESTFN_UNENCODABLE:
936 add_filename(support.TESTFN_UNENCODABLE)
937 if not bytesfn:
938 self.skipTest("couldn't create any non-ascii filename")
939
940 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +0000941 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000942 try:
943 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +0200944 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +0000945 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000946 if fn in self.unicodefn:
947 raise ValueError("duplicate filename")
948 self.unicodefn.add(fn)
949 except:
950 shutil.rmtree(self.dir)
951 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +0000952
953 def tearDown(self):
954 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000955
956 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000957 expected = self.unicodefn
958 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000959 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000960
961 def test_open(self):
962 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +0200963 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +0000964 f.close()
965
966 def test_stat(self):
967 for fn in self.unicodefn:
968 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000969else:
970 class PosixUidGidTests(unittest.TestCase):
971 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +0000972 class Pep383Tests(unittest.TestCase):
973 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000974
Brian Curtineb24d742010-04-12 17:16:38 +0000975@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
976class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +0000977 def _kill(self, sig):
978 # Start sys.executable as a subprocess and communicate from the
979 # subprocess to the parent that the interpreter is ready. When it
980 # becomes ready, send *sig* via os.kill to the subprocess and check
981 # that the return code is equal to *sig*.
982 import ctypes
983 from ctypes import wintypes
984 import msvcrt
985
986 # Since we can't access the contents of the process' stdout until the
987 # process has exited, use PeekNamedPipe to see what's inside stdout
988 # without waiting. This is done so we can tell that the interpreter
989 # is started and running at a point where it could handle a signal.
990 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
991 PeekNamedPipe.restype = wintypes.BOOL
992 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
993 ctypes.POINTER(ctypes.c_char), # stdout buf
994 wintypes.DWORD, # Buffer size
995 ctypes.POINTER(wintypes.DWORD), # bytes read
996 ctypes.POINTER(wintypes.DWORD), # bytes avail
997 ctypes.POINTER(wintypes.DWORD)) # bytes left
998 msg = "running"
999 proc = subprocess.Popen([sys.executable, "-c",
1000 "import sys;"
1001 "sys.stdout.write('{}');"
1002 "sys.stdout.flush();"
1003 "input()".format(msg)],
1004 stdout=subprocess.PIPE,
1005 stderr=subprocess.PIPE,
1006 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001007 self.addCleanup(proc.stdout.close)
1008 self.addCleanup(proc.stderr.close)
1009 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001010
1011 count, max = 0, 100
1012 while count < max and proc.poll() is None:
1013 # Create a string buffer to store the result of stdout from the pipe
1014 buf = ctypes.create_string_buffer(len(msg))
1015 # Obtain the text currently in proc.stdout
1016 # Bytes read/avail/left are left as NULL and unused
1017 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1018 buf, ctypes.sizeof(buf), None, None, None)
1019 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1020 if buf.value:
1021 self.assertEqual(msg, buf.value.decode())
1022 break
1023 time.sleep(0.1)
1024 count += 1
1025 else:
1026 self.fail("Did not receive communication from the subprocess")
1027
Brian Curtineb24d742010-04-12 17:16:38 +00001028 os.kill(proc.pid, sig)
1029 self.assertEqual(proc.wait(), sig)
1030
1031 def test_kill_sigterm(self):
1032 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001033 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001034
1035 def test_kill_int(self):
1036 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001037 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001038
1039 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001040 tagname = "test_os_%s" % uuid.uuid1()
1041 m = mmap.mmap(-1, 1, tagname)
1042 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001043 # Run a script which has console control handling enabled.
1044 proc = subprocess.Popen([sys.executable,
1045 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001046 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001047 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1048 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001049 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001050 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001051 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001052 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001053 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001054 count += 1
1055 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001056 # Forcefully kill the process if we weren't able to signal it.
1057 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001058 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001059 os.kill(proc.pid, event)
1060 # proc.send_signal(event) could also be done here.
1061 # Allow time for the signal to be passed and the process to exit.
1062 time.sleep(0.5)
1063 if not proc.poll():
1064 # Forcefully kill the process if we weren't able to signal it.
1065 os.kill(proc.pid, signal.SIGINT)
1066 self.fail("subprocess did not stop on {}".format(name))
1067
1068 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1069 def test_CTRL_C_EVENT(self):
1070 from ctypes import wintypes
1071 import ctypes
1072
1073 # Make a NULL value by creating a pointer with no argument.
1074 NULL = ctypes.POINTER(ctypes.c_int)()
1075 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1076 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1077 wintypes.BOOL)
1078 SetConsoleCtrlHandler.restype = wintypes.BOOL
1079
1080 # Calling this with NULL and FALSE causes the calling process to
1081 # handle CTRL+C, rather than ignore it. This property is inherited
1082 # by subprocesses.
1083 SetConsoleCtrlHandler(NULL, 0)
1084
1085 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1086
1087 def test_CTRL_BREAK_EVENT(self):
1088 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1089
1090
Brian Curtind40e6f72010-07-08 21:39:08 +00001091@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001092@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001093class Win32SymlinkTests(unittest.TestCase):
1094 filelink = 'filelinktest'
1095 filelink_target = os.path.abspath(__file__)
1096 dirlink = 'dirlinktest'
1097 dirlink_target = os.path.dirname(filelink_target)
1098 missing_link = 'missing link'
1099
1100 def setUp(self):
1101 assert os.path.exists(self.dirlink_target)
1102 assert os.path.exists(self.filelink_target)
1103 assert not os.path.exists(self.dirlink)
1104 assert not os.path.exists(self.filelink)
1105 assert not os.path.exists(self.missing_link)
1106
1107 def tearDown(self):
1108 if os.path.exists(self.filelink):
1109 os.remove(self.filelink)
1110 if os.path.exists(self.dirlink):
1111 os.rmdir(self.dirlink)
1112 if os.path.lexists(self.missing_link):
1113 os.remove(self.missing_link)
1114
1115 def test_directory_link(self):
1116 os.symlink(self.dirlink_target, self.dirlink)
1117 self.assertTrue(os.path.exists(self.dirlink))
1118 self.assertTrue(os.path.isdir(self.dirlink))
1119 self.assertTrue(os.path.islink(self.dirlink))
1120 self.check_stat(self.dirlink, self.dirlink_target)
1121
1122 def test_file_link(self):
1123 os.symlink(self.filelink_target, self.filelink)
1124 self.assertTrue(os.path.exists(self.filelink))
1125 self.assertTrue(os.path.isfile(self.filelink))
1126 self.assertTrue(os.path.islink(self.filelink))
1127 self.check_stat(self.filelink, self.filelink_target)
1128
1129 def _create_missing_dir_link(self):
1130 'Create a "directory" link to a non-existent target'
1131 linkname = self.missing_link
1132 if os.path.lexists(linkname):
1133 os.remove(linkname)
1134 target = r'c:\\target does not exist.29r3c740'
1135 assert not os.path.exists(target)
1136 target_is_dir = True
1137 os.symlink(target, linkname, target_is_dir)
1138
1139 def test_remove_directory_link_to_missing_target(self):
1140 self._create_missing_dir_link()
1141 # For compatibility with Unix, os.remove will check the
1142 # directory status and call RemoveDirectory if the symlink
1143 # was created with target_is_dir==True.
1144 os.remove(self.missing_link)
1145
1146 @unittest.skip("currently fails; consider for improvement")
1147 def test_isdir_on_directory_link_to_missing_target(self):
1148 self._create_missing_dir_link()
1149 # consider having isdir return true for directory links
1150 self.assertTrue(os.path.isdir(self.missing_link))
1151
1152 @unittest.skip("currently fails; consider for improvement")
1153 def test_rmdir_on_directory_link_to_missing_target(self):
1154 self._create_missing_dir_link()
1155 # consider allowing rmdir to remove directory links
1156 os.rmdir(self.missing_link)
1157
1158 def check_stat(self, link, target):
1159 self.assertEqual(os.stat(link), os.stat(target))
1160 self.assertNotEqual(os.lstat(link), os.stat(link))
1161
Brian Curtind25aef52011-06-13 15:16:04 -05001162 bytes_link = os.fsencode(link)
1163 self.assertEqual(os.stat(bytes_link), os.stat(target))
1164 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
1165
1166 def test_12084(self):
1167 level1 = os.path.abspath(support.TESTFN)
1168 level2 = os.path.join(level1, "level2")
1169 level3 = os.path.join(level2, "level3")
1170 try:
1171 os.mkdir(level1)
1172 os.mkdir(level2)
1173 os.mkdir(level3)
1174
1175 file1 = os.path.abspath(os.path.join(level1, "file1"))
1176
1177 with open(file1, "w") as f:
1178 f.write("file1")
1179
1180 orig_dir = os.getcwd()
1181 try:
1182 os.chdir(level2)
1183 link = os.path.join(level2, "link")
1184 os.symlink(os.path.relpath(file1), "link")
1185 self.assertIn("link", os.listdir(os.getcwd()))
1186
1187 # Check os.stat calls from the same dir as the link
1188 self.assertEqual(os.stat(file1), os.stat("link"))
1189
1190 # Check os.stat calls from a dir below the link
1191 os.chdir(level1)
1192 self.assertEqual(os.stat(file1),
1193 os.stat(os.path.relpath(link)))
1194
1195 # Check os.stat calls from a dir above the link
1196 os.chdir(level3)
1197 self.assertEqual(os.stat(file1),
1198 os.stat(os.path.relpath(link)))
1199 finally:
1200 os.chdir(orig_dir)
1201 except OSError as err:
1202 self.fail(err)
1203 finally:
1204 os.remove(file1)
1205 shutil.rmtree(level1)
1206
Brian Curtind40e6f72010-07-08 21:39:08 +00001207
Victor Stinnere8d51452010-08-19 01:05:19 +00001208class FSEncodingTests(unittest.TestCase):
1209 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001210 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1211 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001212
Victor Stinnere8d51452010-08-19 01:05:19 +00001213 def test_identity(self):
1214 # assert fsdecode(fsencode(x)) == x
1215 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1216 try:
1217 bytesfn = os.fsencode(fn)
1218 except UnicodeEncodeError:
1219 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001220 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001221
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001222
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001223class PidTests(unittest.TestCase):
1224 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1225 def test_getppid(self):
1226 p = subprocess.Popen([sys.executable, '-c',
1227 'import os; print(os.getppid())'],
1228 stdout=subprocess.PIPE)
1229 stdout, _ = p.communicate()
1230 # We are the parent of our subprocess
1231 self.assertEqual(int(stdout), os.getpid())
1232
1233
Brian Curtin0151b8e2010-09-24 13:43:43 +00001234# The introduction of this TestCase caused at least two different errors on
1235# *nix buildbots. Temporarily skip this to let the buildbots move along.
1236@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001237@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1238class LoginTests(unittest.TestCase):
1239 def test_getlogin(self):
1240 user_name = os.getlogin()
1241 self.assertNotEqual(len(user_name), 0)
1242
1243
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001244@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1245 "needs os.getpriority and os.setpriority")
1246class ProgramPriorityTests(unittest.TestCase):
1247 """Tests for os.getpriority() and os.setpriority()."""
1248
1249 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001250
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001251 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1252 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1253 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001254 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1255 if base >= 19 and new_prio <= 19:
1256 raise unittest.SkipTest(
1257 "unable to reliably test setpriority at current nice level of %s" % base)
1258 else:
1259 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001260 finally:
1261 try:
1262 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1263 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001264 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001265 raise
1266
1267
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001268if threading is not None:
1269 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001270
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001271 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001272
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001273 def __init__(self, conn):
1274 asynchat.async_chat.__init__(self, conn)
1275 self.in_buffer = []
1276 self.closed = False
1277 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001278
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001279 def handle_read(self):
1280 data = self.recv(4096)
1281 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001282
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001283 def get_data(self):
1284 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001285
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001286 def handle_close(self):
1287 self.close()
1288 self.closed = True
1289
1290 def handle_error(self):
1291 raise
1292
1293 def __init__(self, address):
1294 threading.Thread.__init__(self)
1295 asyncore.dispatcher.__init__(self)
1296 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1297 self.bind(address)
1298 self.listen(5)
1299 self.host, self.port = self.socket.getsockname()[:2]
1300 self.handler_instance = None
1301 self._active = False
1302 self._active_lock = threading.Lock()
1303
1304 # --- public API
1305
1306 @property
1307 def running(self):
1308 return self._active
1309
1310 def start(self):
1311 assert not self.running
1312 self.__flag = threading.Event()
1313 threading.Thread.start(self)
1314 self.__flag.wait()
1315
1316 def stop(self):
1317 assert self.running
1318 self._active = False
1319 self.join()
1320
1321 def wait(self):
1322 # wait for handler connection to be closed, then stop the server
1323 while not getattr(self.handler_instance, "closed", False):
1324 time.sleep(0.001)
1325 self.stop()
1326
1327 # --- internals
1328
1329 def run(self):
1330 self._active = True
1331 self.__flag.set()
1332 while self._active and asyncore.socket_map:
1333 self._active_lock.acquire()
1334 asyncore.loop(timeout=0.001, count=1)
1335 self._active_lock.release()
1336 asyncore.close_all()
1337
1338 def handle_accept(self):
1339 conn, addr = self.accept()
1340 self.handler_instance = self.Handler(conn)
1341
1342 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001343 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001344 handle_read = handle_connect
1345
1346 def writable(self):
1347 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001348
1349 def handle_error(self):
1350 raise
1351
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001352
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001353@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001354@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1355class TestSendfile(unittest.TestCase):
1356
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001357 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001358 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001359 not sys.platform.startswith("solaris") and \
1360 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001361
1362 @classmethod
1363 def setUpClass(cls):
1364 with open(support.TESTFN, "wb") as f:
1365 f.write(cls.DATA)
1366
1367 @classmethod
1368 def tearDownClass(cls):
1369 support.unlink(support.TESTFN)
1370
1371 def setUp(self):
1372 self.server = SendfileTestServer((support.HOST, 0))
1373 self.server.start()
1374 self.client = socket.socket()
1375 self.client.connect((self.server.host, self.server.port))
1376 self.client.settimeout(1)
1377 # synchronize by waiting for "220 ready" response
1378 self.client.recv(1024)
1379 self.sockno = self.client.fileno()
1380 self.file = open(support.TESTFN, 'rb')
1381 self.fileno = self.file.fileno()
1382
1383 def tearDown(self):
1384 self.file.close()
1385 self.client.close()
1386 if self.server.running:
1387 self.server.stop()
1388
1389 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1390 """A higher level wrapper representing how an application is
1391 supposed to use sendfile().
1392 """
1393 while 1:
1394 try:
1395 if self.SUPPORT_HEADERS_TRAILERS:
1396 return os.sendfile(sock, file, offset, nbytes, headers,
1397 trailers)
1398 else:
1399 return os.sendfile(sock, file, offset, nbytes)
1400 except OSError as err:
1401 if err.errno == errno.ECONNRESET:
1402 # disconnected
1403 raise
1404 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1405 # we have to retry send data
1406 continue
1407 else:
1408 raise
1409
1410 def test_send_whole_file(self):
1411 # normal send
1412 total_sent = 0
1413 offset = 0
1414 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001415 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001416 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1417 if sent == 0:
1418 break
1419 offset += sent
1420 total_sent += sent
1421 self.assertTrue(sent <= nbytes)
1422 self.assertEqual(offset, total_sent)
1423
1424 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001425 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001426 self.client.close()
1427 self.server.wait()
1428 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001429 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001430 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001431
1432 def test_send_at_certain_offset(self):
1433 # start sending a file at a certain offset
1434 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001435 offset = len(self.DATA) // 2
1436 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001437 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001438 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001439 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1440 if sent == 0:
1441 break
1442 offset += sent
1443 total_sent += sent
1444 self.assertTrue(sent <= nbytes)
1445
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001446 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001447 self.client.close()
1448 self.server.wait()
1449 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001450 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001451 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001452 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001453 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001454
1455 def test_offset_overflow(self):
1456 # specify an offset > file size
1457 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001458 try:
1459 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1460 except OSError as e:
1461 # Solaris can raise EINVAL if offset >= file length, ignore.
1462 if e.errno != errno.EINVAL:
1463 raise
1464 else:
1465 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001466 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001467 self.client.close()
1468 self.server.wait()
1469 data = self.server.handler_instance.get_data()
1470 self.assertEqual(data, b'')
1471
1472 def test_invalid_offset(self):
1473 with self.assertRaises(OSError) as cm:
1474 os.sendfile(self.sockno, self.fileno, -1, 4096)
1475 self.assertEqual(cm.exception.errno, errno.EINVAL)
1476
1477 # --- headers / trailers tests
1478
1479 if SUPPORT_HEADERS_TRAILERS:
1480
1481 def test_headers(self):
1482 total_sent = 0
1483 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1484 headers=[b"x" * 512])
1485 total_sent += sent
1486 offset = 4096
1487 nbytes = 4096
1488 while 1:
1489 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1490 offset, nbytes)
1491 if sent == 0:
1492 break
1493 total_sent += sent
1494 offset += sent
1495
1496 expected_data = b"x" * 512 + self.DATA
1497 self.assertEqual(total_sent, len(expected_data))
1498 self.client.close()
1499 self.server.wait()
1500 data = self.server.handler_instance.get_data()
1501 self.assertEqual(hash(data), hash(expected_data))
1502
1503 def test_trailers(self):
1504 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001505 with open(TESTFN2, 'wb') as f:
1506 f.write(b"abcde")
1507 with open(TESTFN2, 'rb')as f:
1508 self.addCleanup(os.remove, TESTFN2)
1509 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1510 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001511 self.client.close()
1512 self.server.wait()
1513 data = self.server.handler_instance.get_data()
1514 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001515
1516 if hasattr(os, "SF_NODISKIO"):
1517 def test_flags(self):
1518 try:
1519 os.sendfile(self.sockno, self.fileno, 0, 4096,
1520 flags=os.SF_NODISKIO)
1521 except OSError as err:
1522 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1523 raise
1524
1525
Benjamin Peterson799bd802011-08-31 22:15:17 -04001526def supports_extended_attributes():
1527 if not hasattr(os, "setxattr"):
1528 return False
1529 try:
1530 with open(support.TESTFN, "wb") as fp:
1531 try:
1532 os.fsetxattr(fp.fileno(), b"user.test", b"")
1533 except OSError as e:
1534 if e.errno != errno.ENOTSUP:
1535 raise
1536 return False
1537 finally:
1538 support.unlink(support.TESTFN)
1539 # Kernels < 2.6.39 don't respect setxattr flags.
1540 kernel_version = platform.release()
1541 m = re.match("2.6.(\d{1,2})", kernel_version)
1542 return m is None or int(m.group(1)) >= 39
1543
1544
1545@unittest.skipUnless(supports_extended_attributes(),
1546 "no non-broken extended attribute support")
1547class ExtendedAttributeTests(unittest.TestCase):
1548
1549 def tearDown(self):
1550 support.unlink(support.TESTFN)
1551
1552 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1553 fn = support.TESTFN
1554 open(fn, "wb").close()
1555 with self.assertRaises(OSError) as cm:
1556 getxattr(fn, s("user.test"))
1557 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001558 init_xattr = listxattr(fn)
1559 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001560 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001561 xattr = set(init_xattr)
1562 xattr.add("user.test")
1563 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001564 self.assertEqual(getxattr(fn, b"user.test"), b"")
1565 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1566 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1567 with self.assertRaises(OSError) as cm:
1568 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1569 self.assertEqual(cm.exception.errno, errno.EEXIST)
1570 with self.assertRaises(OSError) as cm:
1571 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1572 self.assertEqual(cm.exception.errno, errno.ENODATA)
1573 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001574 xattr.add("user.test2")
1575 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001576 removexattr(fn, s("user.test"))
1577 with self.assertRaises(OSError) as cm:
1578 getxattr(fn, s("user.test"))
1579 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001580 xattr.remove("user.test")
1581 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001582 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1583 setxattr(fn, s("user.test"), b"a"*1024)
1584 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1585 removexattr(fn, s("user.test"))
1586 many = sorted("user.test{}".format(i) for i in range(100))
1587 for thing in many:
1588 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001589 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001590
1591 def _check_xattrs(self, *args):
1592 def make_bytes(s):
1593 return bytes(s, "ascii")
1594 self._check_xattrs_str(str, *args)
1595 support.unlink(support.TESTFN)
1596 self._check_xattrs_str(make_bytes, *args)
1597
1598 def test_simple(self):
1599 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1600 os.listxattr)
1601
1602 def test_lpath(self):
1603 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1604 os.llistxattr)
1605
1606 def test_fds(self):
1607 def getxattr(path, *args):
1608 with open(path, "rb") as fp:
1609 return os.fgetxattr(fp.fileno(), *args)
1610 def setxattr(path, *args):
1611 with open(path, "wb") as fp:
1612 os.fsetxattr(fp.fileno(), *args)
1613 def removexattr(path, *args):
1614 with open(path, "wb") as fp:
1615 os.fremovexattr(fp.fileno(), *args)
1616 def listxattr(path, *args):
1617 with open(path, "rb") as fp:
1618 return os.flistxattr(fp.fileno(), *args)
1619 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1620
1621
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001622@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001623def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001624 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001625 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001626 StatAttributeTests,
1627 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001628 WalkTests,
1629 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001630 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001631 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001632 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001633 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001634 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001635 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001636 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001637 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001638 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001639 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001640 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001641 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001642 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001643 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001644 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001645 ExtendedAttributeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001646 )
Fred Drake2e2be372001-09-20 21:33:42 +00001647
1648if __name__ == "__main__":
1649 test_main()