blob: 293005b3a1d9275364acfc393a5e196450a4b915 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
23try:
24 import threading
25except ImportError:
26 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000027
Mark Dickinson7cf03892010-04-16 13:45:35 +000028# Detect whether we're on a Linux system that uses the (now outdated
29# and unmaintained) linuxthreads threading library. There's an issue
30# when combining linuxthreads with a failed execv call: see
31# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020032if hasattr(sys, 'thread_info') and sys.thread_info.version:
33 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
34else:
35 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000036
Thomas Wouters0e3f5912006-08-11 14:57:12 +000037# Tests creating TESTFN
38class FileTests(unittest.TestCase):
39 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040 if os.path.exists(support.TESTFN):
41 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000042 tearDown = setUp
43
44 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000045 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000047 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048
Christian Heimesfdab48e2008-01-20 09:06:41 +000049 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000050 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
51 # We must allocate two consecutive file descriptors, otherwise
52 # it will mess up other file descriptors (perhaps even the three
53 # standard ones).
54 second = os.dup(first)
55 try:
56 retries = 0
57 while second != first + 1:
58 os.close(first)
59 retries += 1
60 if retries > 10:
61 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000062 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first, second = second, os.dup(second)
64 finally:
65 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000066 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000067 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000068 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000070 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000071 def test_rename(self):
72 path = support.TESTFN
73 old = sys.getrefcount(path)
74 self.assertRaises(TypeError, os.rename, path, 0)
75 new = sys.getrefcount(path)
76 self.assertEqual(old, new)
77
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000078 def test_read(self):
79 with open(support.TESTFN, "w+b") as fobj:
80 fobj.write(b"spam")
81 fobj.flush()
82 fd = fobj.fileno()
83 os.lseek(fd, 0, 0)
84 s = os.read(fd, 4)
85 self.assertEqual(type(s), bytes)
86 self.assertEqual(s, b"spam")
87
88 def test_write(self):
89 # os.write() accepts bytes- and buffer-like objects but not strings
90 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
91 self.assertRaises(TypeError, os.write, fd, "beans")
92 os.write(fd, b"bacon\n")
93 os.write(fd, bytearray(b"eggs\n"))
94 os.write(fd, memoryview(b"spam\n"))
95 os.close(fd)
96 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000097 self.assertEqual(fobj.read().splitlines(),
98 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099
Victor Stinnere0daff12011-03-20 23:36:35 +0100100 def write_windows_console(self, *args):
101 retcode = subprocess.call(args,
102 # use a new console to not flood the test output
103 creationflags=subprocess.CREATE_NEW_CONSOLE,
104 # use a shell to hide the console window (SW_HIDE)
105 shell=True)
106 self.assertEqual(retcode, 0)
107
108 @unittest.skipUnless(sys.platform == 'win32',
109 'test specific to the Windows console')
110 def test_write_windows_console(self):
111 # Issue #11395: the Windows console returns an error (12: not enough
112 # space error) on writing into stdout if stdout mode is binary and the
113 # length is greater than 66,000 bytes (or less, depending on heap
114 # usage).
115 code = "print('x' * 100000)"
116 self.write_windows_console(sys.executable, "-c", code)
117 self.write_windows_console(sys.executable, "-u", "-c", code)
118
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000119 def fdopen_helper(self, *args):
120 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200121 f = os.fdopen(fd, *args)
122 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000123
124 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200125 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
126 os.close(fd)
127
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000128 self.fdopen_helper()
129 self.fdopen_helper('r')
130 self.fdopen_helper('r', 100)
131
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100132 def test_replace(self):
133 TESTFN2 = support.TESTFN + ".2"
134 with open(support.TESTFN, 'w') as f:
135 f.write("1")
136 with open(TESTFN2, 'w') as f:
137 f.write("2")
138 self.addCleanup(os.unlink, TESTFN2)
139 os.replace(support.TESTFN, TESTFN2)
140 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
141 with open(TESTFN2, 'r') as f:
142 self.assertEqual(f.read(), "1")
143
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200144
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000145# Test attributes on return values from os.*stat* family.
146class StatAttributeTests(unittest.TestCase):
147 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000148 os.mkdir(support.TESTFN)
149 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000150 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000151 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000152 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000153
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000154 def tearDown(self):
155 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000156 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000157
Antoine Pitrou38425292010-09-21 18:19:07 +0000158 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000159 if not hasattr(os, "stat"):
160 return
161
162 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000163 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000164
165 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000166 self.assertEqual(result[stat.ST_SIZE], 3)
167 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000168
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169 # Make sure all the attributes are there
170 members = dir(result)
171 for name in dir(stat):
172 if name[:3] == 'ST_':
173 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000174 if name.endswith("TIME"):
175 def trunc(x): return int(x)
176 else:
177 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000178 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000179 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000180 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181
182 try:
183 result[200]
184 self.fail("No exception thrown")
185 except IndexError:
186 pass
187
188 # Make sure that assignment fails
189 try:
190 result.st_mode = 1
191 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000192 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000193 pass
194
195 try:
196 result.st_rdev = 1
197 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000198 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000199 pass
200
201 try:
202 result.parrot = 1
203 self.fail("No exception thrown")
204 except AttributeError:
205 pass
206
207 # Use the stat_result constructor with a too-short tuple.
208 try:
209 result2 = os.stat_result((10,))
210 self.fail("No exception thrown")
211 except TypeError:
212 pass
213
Ezio Melotti42da6632011-03-15 05:18:48 +0200214 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000215 try:
216 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
217 except TypeError:
218 pass
219
Antoine Pitrou38425292010-09-21 18:19:07 +0000220 def test_stat_attributes(self):
221 self.check_stat_attributes(self.fname)
222
223 def test_stat_attributes_bytes(self):
224 try:
225 fname = self.fname.encode(sys.getfilesystemencoding())
226 except UnicodeEncodeError:
227 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100228 with warnings.catch_warnings():
229 warnings.simplefilter("ignore", DeprecationWarning)
230 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000231
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000232 def test_statvfs_attributes(self):
233 if not hasattr(os, "statvfs"):
234 return
235
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000236 try:
237 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000238 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000239 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000240 if e.errno == errno.ENOSYS:
241 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242
243 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000245
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000246 # Make sure all the attributes are there.
247 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
248 'ffree', 'favail', 'flag', 'namemax')
249 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000250 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000251
252 # Make sure that assignment really fails
253 try:
254 result.f_bfree = 1
255 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000256 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257 pass
258
259 try:
260 result.parrot = 1
261 self.fail("No exception thrown")
262 except AttributeError:
263 pass
264
265 # Use the constructor with a too-short tuple.
266 try:
267 result2 = os.statvfs_result((10,))
268 self.fail("No exception thrown")
269 except TypeError:
270 pass
271
Ezio Melotti42da6632011-03-15 05:18:48 +0200272 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273 try:
274 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
275 except TypeError:
276 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000277
Thomas Wouters89f507f2006-12-13 04:49:30 +0000278 def test_utime_dir(self):
279 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000280 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000281 # round to int, because some systems may support sub-second
282 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000283 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
284 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000285 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000286
Brian Curtin52fbea12011-11-06 13:41:17 -0600287 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600288 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600289 # second argument. Check that the previous methods of passing
290 # a time tuple or None work in addition to no argument.
291 st = os.stat(support.TESTFN)
292 # Doesn't set anything new, but sets the time tuple way
293 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
294 # Set to the current time in the old explicit way.
295 os.utime(support.TESTFN, None)
296 st1 = os.stat(support.TESTFN)
297 # Set to the current time in the new way
298 os.utime(support.TESTFN)
299 st2 = os.stat(support.TESTFN)
300 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
301
Thomas Wouters89f507f2006-12-13 04:49:30 +0000302 # Restrict test to Win32, since there is no guarantee other
303 # systems support centiseconds
304 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000305 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000306 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000307 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000308 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000309 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000310 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000311 return buf.value
312
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000313 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000314 def test_1565150(self):
315 t1 = 1159195039.25
316 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000317 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000318
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000319 def test_large_time(self):
320 t1 = 5000000000 # some day in 2128
321 os.utime(self.fname, (t1, t1))
322 self.assertEqual(os.stat(self.fname).st_mtime, t1)
323
Guido van Rossumd8faa362007-04-27 19:54:29 +0000324 def test_1686475(self):
325 # Verify that an open file can be stat'ed
326 try:
327 os.stat(r"c:\pagefile.sys")
328 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000329 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000330 return
331 self.fail("Could not stat pagefile.sys")
332
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000333from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000334
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000335class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000336 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000337 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000338
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000339 def setUp(self):
340 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000341 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000342 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000343 for key, value in self._reference().items():
344 os.environ[key] = value
345
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000346 def tearDown(self):
347 os.environ.clear()
348 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000349 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000350 os.environb.clear()
351 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000352
Christian Heimes90333392007-11-01 19:08:42 +0000353 def _reference(self):
354 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
355
356 def _empty_mapping(self):
357 os.environ.clear()
358 return os.environ
359
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000360 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000361 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000362 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000363 if os.path.exists("/bin/sh"):
364 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000365 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
366 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000367 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000368
Christian Heimes1a13d592007-11-08 14:16:55 +0000369 def test_os_popen_iter(self):
370 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000371 with os.popen(
372 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
373 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000374 self.assertEqual(next(it), "line1\n")
375 self.assertEqual(next(it), "line2\n")
376 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000377 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000378
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000379 # Verify environ keys and values from the OS are of the
380 # correct str type.
381 def test_keyvalue_types(self):
382 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000383 self.assertEqual(type(key), str)
384 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000385
Christian Heimes90333392007-11-01 19:08:42 +0000386 def test_items(self):
387 for key, value in self._reference().items():
388 self.assertEqual(os.environ.get(key), value)
389
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000390 # Issue 7310
391 def test___repr__(self):
392 """Check that the repr() of os.environ looks like environ({...})."""
393 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000394 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
395 '{!r}: {!r}'.format(key, value)
396 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000397
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000398 def test_get_exec_path(self):
399 defpath_list = os.defpath.split(os.pathsep)
400 test_path = ['/monty', '/python', '', '/flying/circus']
401 test_env = {'PATH': os.pathsep.join(test_path)}
402
403 saved_environ = os.environ
404 try:
405 os.environ = dict(test_env)
406 # Test that defaulting to os.environ works.
407 self.assertSequenceEqual(test_path, os.get_exec_path())
408 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
409 finally:
410 os.environ = saved_environ
411
412 # No PATH environment variable
413 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
414 # Empty PATH environment variable
415 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
416 # Supplied PATH environment variable
417 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
418
Victor Stinnerb745a742010-05-18 17:17:23 +0000419 if os.supports_bytes_environ:
420 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000421 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000422 # ignore BytesWarning warning
423 with warnings.catch_warnings(record=True):
424 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000425 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000426 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000427 pass
428 else:
429 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000430
431 # bytes key and/or value
432 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
433 ['abc'])
434 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
435 ['abc'])
436 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
437 ['abc'])
438
439 @unittest.skipUnless(os.supports_bytes_environ,
440 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000441 def test_environb(self):
442 # os.environ -> os.environb
443 value = 'euro\u20ac'
444 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000445 value_bytes = value.encode(sys.getfilesystemencoding(),
446 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000447 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000448 msg = "U+20AC character is not encodable to %s" % (
449 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000450 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000451 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000452 self.assertEqual(os.environ['unicode'], value)
453 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000454
455 # os.environb -> os.environ
456 value = b'\xff'
457 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000458 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000459 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000460 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000461
Charles-François Natali2966f102011-11-26 11:32:46 +0100462 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
463 # #13415).
464 @support.requires_freebsd_version(7)
465 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100466 def test_unset_error(self):
467 if sys.platform == "win32":
468 # an environment variable is limited to 32,767 characters
469 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100470 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100471 else:
472 # "=" is not allowed in a variable name
473 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100474 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100475
Tim Petersc4e09402003-04-25 07:11:48 +0000476class WalkTests(unittest.TestCase):
477 """Tests for os.walk()."""
478
479 def test_traversal(self):
480 import os
481 from os.path import join
482
483 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000484 # TESTFN/
485 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000486 # tmp1
487 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000488 # tmp2
489 # SUB11/ no kids
490 # SUB2/ a file kid and a dirsymlink kid
491 # tmp3
492 # link/ a symlink to TESTFN.2
493 # TEST2/
494 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000495 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000496 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000497 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000498 sub2_path = join(walk_path, "SUB2")
499 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000500 tmp2_path = join(sub1_path, "tmp2")
501 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000502 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000503 t2_path = join(support.TESTFN, "TEST2")
504 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000505
506 # Create stuff.
507 os.makedirs(sub11_path)
508 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000509 os.makedirs(t2_path)
510 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000511 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000512 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
513 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000514 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100515 if os.name == 'nt':
516 def symlink_to_dir(src, dest):
517 os.symlink(src, dest, True)
518 else:
519 symlink_to_dir = os.symlink
520 symlink_to_dir(os.path.abspath(t2_path), link_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000521 sub2_tree = (sub2_path, ["link"], ["tmp3"])
522 else:
523 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000524
525 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000526 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000527 self.assertEqual(len(all), 4)
528 # We can't know which order SUB1 and SUB2 will appear in.
529 # Not flipped: TESTFN, SUB1, SUB11, SUB2
530 # flipped: TESTFN, SUB2, SUB1, SUB11
531 flipped = all[0][1][0] != "SUB1"
532 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000533 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000534 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
535 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000536 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000537
538 # Prune the search.
539 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000540 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000541 all.append((root, dirs, files))
542 # Don't descend into SUB1.
543 if 'SUB1' in dirs:
544 # Note that this also mutates the dirs we appended to all!
545 dirs.remove('SUB1')
546 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000547 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
548 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000549
550 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000551 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000552 self.assertEqual(len(all), 4)
553 # We can't know which order SUB1 and SUB2 will appear in.
554 # Not flipped: SUB11, SUB1, SUB2, TESTFN
555 # flipped: SUB2, SUB11, SUB1, TESTFN
556 flipped = all[3][1][0] != "SUB1"
557 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000558 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000559 self.assertEqual(all[flipped], (sub11_path, [], []))
560 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000561 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000562
Brian Curtin3b4499c2010-12-28 14:31:47 +0000563 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000564 # Walk, following symlinks.
565 for root, dirs, files in os.walk(walk_path, followlinks=True):
566 if root == link_path:
567 self.assertEqual(dirs, [])
568 self.assertEqual(files, ["tmp4"])
569 break
570 else:
571 self.fail("Didn't follow symlink with followlinks=True")
572
573 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000574 # Tear everything down. This is a decent use for bottom-up on
575 # Windows, which doesn't have a recursive delete command. The
576 # (not so) subtlety is that rmdir will fail unless the dir's
577 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000578 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000579 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000580 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000581 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000582 dirname = os.path.join(root, name)
583 if not os.path.islink(dirname):
584 os.rmdir(dirname)
585 else:
586 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000587 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000588
Guido van Rossume7ba4952007-06-06 23:52:48 +0000589class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000590 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000591 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000592
593 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000594 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000595 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
596 os.makedirs(path) # Should work
597 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
598 os.makedirs(path)
599
600 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000601 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000602 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
603 os.makedirs(path)
604 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
605 'dir5', 'dir6')
606 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000607
Terry Reedy5a22b652010-12-02 07:05:56 +0000608 def test_exist_ok_existing_directory(self):
609 path = os.path.join(support.TESTFN, 'dir1')
610 mode = 0o777
611 old_mask = os.umask(0o022)
612 os.makedirs(path, mode)
613 self.assertRaises(OSError, os.makedirs, path, mode)
614 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
615 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
616 os.makedirs(path, mode=mode, exist_ok=True)
617 os.umask(old_mask)
618
619 def test_exist_ok_existing_regular_file(self):
620 base = support.TESTFN
621 path = os.path.join(support.TESTFN, 'dir1')
622 f = open(path, 'w')
623 f.write('abc')
624 f.close()
625 self.assertRaises(OSError, os.makedirs, path)
626 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
627 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
628 os.remove(path)
629
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000630 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000631 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000632 'dir4', 'dir5', 'dir6')
633 # If the tests failed, the bottom-most directory ('../dir6')
634 # may not have been created, so we look for the outermost directory
635 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000636 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000637 path = os.path.dirname(path)
638
639 os.removedirs(path)
640
Guido van Rossume7ba4952007-06-06 23:52:48 +0000641class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000642 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200643 with open(os.devnull, 'wb') as f:
644 f.write(b'hello')
645 f.close()
646 with open(os.devnull, 'rb') as f:
647 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000648
Guido van Rossume7ba4952007-06-06 23:52:48 +0000649class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000650 def test_urandom(self):
651 try:
652 self.assertEqual(len(os.urandom(1)), 1)
653 self.assertEqual(len(os.urandom(10)), 10)
654 self.assertEqual(len(os.urandom(100)), 100)
655 self.assertEqual(len(os.urandom(1000)), 1000)
656 except NotImplementedError:
657 pass
658
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000659@contextlib.contextmanager
660def _execvpe_mockup(defpath=None):
661 """
662 Stubs out execv and execve functions when used as context manager.
663 Records exec calls. The mock execv and execve functions always raise an
664 exception as they would normally never return.
665 """
666 # A list of tuples containing (function name, first arg, args)
667 # of calls to execv or execve that have been made.
668 calls = []
669
670 def mock_execv(name, *args):
671 calls.append(('execv', name, args))
672 raise RuntimeError("execv called")
673
674 def mock_execve(name, *args):
675 calls.append(('execve', name, args))
676 raise OSError(errno.ENOTDIR, "execve called")
677
678 try:
679 orig_execv = os.execv
680 orig_execve = os.execve
681 orig_defpath = os.defpath
682 os.execv = mock_execv
683 os.execve = mock_execve
684 if defpath is not None:
685 os.defpath = defpath
686 yield calls
687 finally:
688 os.execv = orig_execv
689 os.execve = orig_execve
690 os.defpath = orig_defpath
691
Guido van Rossume7ba4952007-06-06 23:52:48 +0000692class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000693 @unittest.skipIf(USING_LINUXTHREADS,
694 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000695 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000696 self.assertRaises(OSError, os.execvpe, 'no such app-',
697 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000698
Thomas Heller6790d602007-08-30 17:15:14 +0000699 def test_execvpe_with_bad_arglist(self):
700 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
701
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000702 @unittest.skipUnless(hasattr(os, '_execvpe'),
703 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000704 def _test_internal_execvpe(self, test_type):
705 program_path = os.sep + 'absolutepath'
706 if test_type is bytes:
707 program = b'executable'
708 fullpath = os.path.join(os.fsencode(program_path), program)
709 native_fullpath = fullpath
710 arguments = [b'progname', 'arg1', 'arg2']
711 else:
712 program = 'executable'
713 arguments = ['progname', 'arg1', 'arg2']
714 fullpath = os.path.join(program_path, program)
715 if os.name != "nt":
716 native_fullpath = os.fsencode(fullpath)
717 else:
718 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000719 env = {'spam': 'beans'}
720
Victor Stinnerb745a742010-05-18 17:17:23 +0000721 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000722 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000723 self.assertRaises(RuntimeError,
724 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000725 self.assertEqual(len(calls), 1)
726 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
727
Victor Stinnerb745a742010-05-18 17:17:23 +0000728 # test os._execvpe() with a relative path:
729 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000730 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000731 self.assertRaises(OSError,
732 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000733 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000734 self.assertSequenceEqual(calls[0],
735 ('execve', native_fullpath, (arguments, env)))
736
737 # test os._execvpe() with a relative path:
738 # os.get_exec_path() reads the 'PATH' variable
739 with _execvpe_mockup() as calls:
740 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000741 if test_type is bytes:
742 env_path[b'PATH'] = program_path
743 else:
744 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000745 self.assertRaises(OSError,
746 os._execvpe, program, arguments, env=env_path)
747 self.assertEqual(len(calls), 1)
748 self.assertSequenceEqual(calls[0],
749 ('execve', native_fullpath, (arguments, env_path)))
750
751 def test_internal_execvpe_str(self):
752 self._test_internal_execvpe(str)
753 if os.name != "nt":
754 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000755
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000756
Thomas Wouters477c8d52006-05-27 19:21:47 +0000757class Win32ErrorTests(unittest.TestCase):
758 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000759 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000760
761 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000762 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000763
764 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000765 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000766
767 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000768 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000769 try:
770 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
771 finally:
772 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000773 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000774
775 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000776 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000777
Thomas Wouters477c8d52006-05-27 19:21:47 +0000778 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000779 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000780
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000781class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000782 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000783 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
784 #singles.append("close")
785 #We omit close because it doesn'r raise an exception on some platforms
786 def get_single(f):
787 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000788 if hasattr(os, f):
789 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000790 return helper
791 for f in singles:
792 locals()["test_"+f] = get_single(f)
793
Benjamin Peterson7522c742009-01-19 21:00:09 +0000794 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000795 try:
796 f(support.make_bad_fd(), *args)
797 except OSError as e:
798 self.assertEqual(e.errno, errno.EBADF)
799 else:
800 self.fail("%r didn't raise a OSError with a bad file descriptor"
801 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000802
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000803 def test_isatty(self):
804 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000805 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000806
807 def test_closerange(self):
808 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000809 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000810 # Make sure none of the descriptors we are about to close are
811 # currently valid (issue 6542).
812 for i in range(10):
813 try: os.fstat(fd+i)
814 except OSError:
815 pass
816 else:
817 break
818 if i < 2:
819 raise unittest.SkipTest(
820 "Unable to acquire a range of invalid file descriptors")
821 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000822
823 def test_dup2(self):
824 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000825 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000826
827 def test_fchmod(self):
828 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000829 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000830
831 def test_fchown(self):
832 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000833 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000834
835 def test_fpathconf(self):
836 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000837 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000838
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000839 def test_ftruncate(self):
840 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000841 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000842
843 def test_lseek(self):
844 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000845 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000846
847 def test_read(self):
848 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000849 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000850
851 def test_tcsetpgrpt(self):
852 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000853 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000854
855 def test_write(self):
856 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000857 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000858
Brian Curtin1b9df392010-11-24 20:24:31 +0000859
860class LinkTests(unittest.TestCase):
861 def setUp(self):
862 self.file1 = support.TESTFN
863 self.file2 = os.path.join(support.TESTFN + "2")
864
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000865 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000866 for file in (self.file1, self.file2):
867 if os.path.exists(file):
868 os.unlink(file)
869
Brian Curtin1b9df392010-11-24 20:24:31 +0000870 def _test_link(self, file1, file2):
871 with open(file1, "w") as f1:
872 f1.write("test")
873
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100874 with warnings.catch_warnings():
875 warnings.simplefilter("ignore", DeprecationWarning)
876 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +0000877 with open(file1, "r") as f1, open(file2, "r") as f2:
878 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
879
880 def test_link(self):
881 self._test_link(self.file1, self.file2)
882
883 def test_link_bytes(self):
884 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
885 bytes(self.file2, sys.getfilesystemencoding()))
886
Brian Curtinf498b752010-11-30 15:54:04 +0000887 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000888 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000889 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000890 except UnicodeError:
891 raise unittest.SkipTest("Unable to encode for this platform.")
892
Brian Curtinf498b752010-11-30 15:54:04 +0000893 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000894 self.file2 = self.file1 + "2"
895 self._test_link(self.file1, self.file2)
896
Thomas Wouters477c8d52006-05-27 19:21:47 +0000897if sys.platform != 'win32':
898 class Win32ErrorTests(unittest.TestCase):
899 pass
900
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000901 class PosixUidGidTests(unittest.TestCase):
902 if hasattr(os, 'setuid'):
903 def test_setuid(self):
904 if os.getuid() != 0:
905 self.assertRaises(os.error, os.setuid, 0)
906 self.assertRaises(OverflowError, os.setuid, 1<<32)
907
908 if hasattr(os, 'setgid'):
909 def test_setgid(self):
910 if os.getuid() != 0:
911 self.assertRaises(os.error, os.setgid, 0)
912 self.assertRaises(OverflowError, os.setgid, 1<<32)
913
914 if hasattr(os, 'seteuid'):
915 def test_seteuid(self):
916 if os.getuid() != 0:
917 self.assertRaises(os.error, os.seteuid, 0)
918 self.assertRaises(OverflowError, os.seteuid, 1<<32)
919
920 if hasattr(os, 'setegid'):
921 def test_setegid(self):
922 if os.getuid() != 0:
923 self.assertRaises(os.error, os.setegid, 0)
924 self.assertRaises(OverflowError, os.setegid, 1<<32)
925
926 if hasattr(os, 'setreuid'):
927 def test_setreuid(self):
928 if os.getuid() != 0:
929 self.assertRaises(os.error, os.setreuid, 0, 0)
930 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
931 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000932
933 def test_setreuid_neg1(self):
934 # Needs to accept -1. We run this in a subprocess to avoid
935 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000936 subprocess.check_call([
937 sys.executable, '-c',
938 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000939
940 if hasattr(os, 'setregid'):
941 def test_setregid(self):
942 if os.getuid() != 0:
943 self.assertRaises(os.error, os.setregid, 0, 0)
944 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
945 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000946
947 def test_setregid_neg1(self):
948 # Needs to accept -1. We run this in a subprocess to avoid
949 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000950 subprocess.check_call([
951 sys.executable, '-c',
952 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +0000953
954 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +0000955 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000956 if support.TESTFN_UNENCODABLE:
957 self.dir = support.TESTFN_UNENCODABLE
958 else:
959 self.dir = support.TESTFN
960 self.bdir = os.fsencode(self.dir)
961
962 bytesfn = []
963 def add_filename(fn):
964 try:
965 fn = os.fsencode(fn)
966 except UnicodeEncodeError:
967 return
968 bytesfn.append(fn)
969 add_filename(support.TESTFN_UNICODE)
970 if support.TESTFN_UNENCODABLE:
971 add_filename(support.TESTFN_UNENCODABLE)
972 if not bytesfn:
973 self.skipTest("couldn't create any non-ascii filename")
974
975 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +0000976 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000977 try:
978 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +0200979 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +0000980 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000981 if fn in self.unicodefn:
982 raise ValueError("duplicate filename")
983 self.unicodefn.add(fn)
984 except:
985 shutil.rmtree(self.dir)
986 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +0000987
988 def tearDown(self):
989 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000990
991 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000992 expected = self.unicodefn
993 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000994 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000995
996 def test_open(self):
997 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +0200998 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +0000999 f.close()
1000
1001 def test_stat(self):
1002 for fn in self.unicodefn:
1003 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001004else:
1005 class PosixUidGidTests(unittest.TestCase):
1006 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001007 class Pep383Tests(unittest.TestCase):
1008 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001009
Brian Curtineb24d742010-04-12 17:16:38 +00001010@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1011class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001012 def _kill(self, sig):
1013 # Start sys.executable as a subprocess and communicate from the
1014 # subprocess to the parent that the interpreter is ready. When it
1015 # becomes ready, send *sig* via os.kill to the subprocess and check
1016 # that the return code is equal to *sig*.
1017 import ctypes
1018 from ctypes import wintypes
1019 import msvcrt
1020
1021 # Since we can't access the contents of the process' stdout until the
1022 # process has exited, use PeekNamedPipe to see what's inside stdout
1023 # without waiting. This is done so we can tell that the interpreter
1024 # is started and running at a point where it could handle a signal.
1025 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1026 PeekNamedPipe.restype = wintypes.BOOL
1027 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1028 ctypes.POINTER(ctypes.c_char), # stdout buf
1029 wintypes.DWORD, # Buffer size
1030 ctypes.POINTER(wintypes.DWORD), # bytes read
1031 ctypes.POINTER(wintypes.DWORD), # bytes avail
1032 ctypes.POINTER(wintypes.DWORD)) # bytes left
1033 msg = "running"
1034 proc = subprocess.Popen([sys.executable, "-c",
1035 "import sys;"
1036 "sys.stdout.write('{}');"
1037 "sys.stdout.flush();"
1038 "input()".format(msg)],
1039 stdout=subprocess.PIPE,
1040 stderr=subprocess.PIPE,
1041 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001042 self.addCleanup(proc.stdout.close)
1043 self.addCleanup(proc.stderr.close)
1044 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001045
1046 count, max = 0, 100
1047 while count < max and proc.poll() is None:
1048 # Create a string buffer to store the result of stdout from the pipe
1049 buf = ctypes.create_string_buffer(len(msg))
1050 # Obtain the text currently in proc.stdout
1051 # Bytes read/avail/left are left as NULL and unused
1052 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1053 buf, ctypes.sizeof(buf), None, None, None)
1054 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1055 if buf.value:
1056 self.assertEqual(msg, buf.value.decode())
1057 break
1058 time.sleep(0.1)
1059 count += 1
1060 else:
1061 self.fail("Did not receive communication from the subprocess")
1062
Brian Curtineb24d742010-04-12 17:16:38 +00001063 os.kill(proc.pid, sig)
1064 self.assertEqual(proc.wait(), sig)
1065
1066 def test_kill_sigterm(self):
1067 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001068 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001069
1070 def test_kill_int(self):
1071 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001072 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001073
1074 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001075 tagname = "test_os_%s" % uuid.uuid1()
1076 m = mmap.mmap(-1, 1, tagname)
1077 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001078 # Run a script which has console control handling enabled.
1079 proc = subprocess.Popen([sys.executable,
1080 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001081 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001082 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1083 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001084 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001085 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001086 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001087 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001088 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001089 count += 1
1090 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001091 # Forcefully kill the process if we weren't able to signal it.
1092 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001093 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001094 os.kill(proc.pid, event)
1095 # proc.send_signal(event) could also be done here.
1096 # Allow time for the signal to be passed and the process to exit.
1097 time.sleep(0.5)
1098 if not proc.poll():
1099 # Forcefully kill the process if we weren't able to signal it.
1100 os.kill(proc.pid, signal.SIGINT)
1101 self.fail("subprocess did not stop on {}".format(name))
1102
1103 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1104 def test_CTRL_C_EVENT(self):
1105 from ctypes import wintypes
1106 import ctypes
1107
1108 # Make a NULL value by creating a pointer with no argument.
1109 NULL = ctypes.POINTER(ctypes.c_int)()
1110 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1111 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1112 wintypes.BOOL)
1113 SetConsoleCtrlHandler.restype = wintypes.BOOL
1114
1115 # Calling this with NULL and FALSE causes the calling process to
1116 # handle CTRL+C, rather than ignore it. This property is inherited
1117 # by subprocesses.
1118 SetConsoleCtrlHandler(NULL, 0)
1119
1120 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1121
1122 def test_CTRL_BREAK_EVENT(self):
1123 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1124
1125
Brian Curtind40e6f72010-07-08 21:39:08 +00001126@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001127@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001128class Win32SymlinkTests(unittest.TestCase):
1129 filelink = 'filelinktest'
1130 filelink_target = os.path.abspath(__file__)
1131 dirlink = 'dirlinktest'
1132 dirlink_target = os.path.dirname(filelink_target)
1133 missing_link = 'missing link'
1134
1135 def setUp(self):
1136 assert os.path.exists(self.dirlink_target)
1137 assert os.path.exists(self.filelink_target)
1138 assert not os.path.exists(self.dirlink)
1139 assert not os.path.exists(self.filelink)
1140 assert not os.path.exists(self.missing_link)
1141
1142 def tearDown(self):
1143 if os.path.exists(self.filelink):
1144 os.remove(self.filelink)
1145 if os.path.exists(self.dirlink):
1146 os.rmdir(self.dirlink)
1147 if os.path.lexists(self.missing_link):
1148 os.remove(self.missing_link)
1149
1150 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001151 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001152 self.assertTrue(os.path.exists(self.dirlink))
1153 self.assertTrue(os.path.isdir(self.dirlink))
1154 self.assertTrue(os.path.islink(self.dirlink))
1155 self.check_stat(self.dirlink, self.dirlink_target)
1156
1157 def test_file_link(self):
1158 os.symlink(self.filelink_target, self.filelink)
1159 self.assertTrue(os.path.exists(self.filelink))
1160 self.assertTrue(os.path.isfile(self.filelink))
1161 self.assertTrue(os.path.islink(self.filelink))
1162 self.check_stat(self.filelink, self.filelink_target)
1163
1164 def _create_missing_dir_link(self):
1165 'Create a "directory" link to a non-existent target'
1166 linkname = self.missing_link
1167 if os.path.lexists(linkname):
1168 os.remove(linkname)
1169 target = r'c:\\target does not exist.29r3c740'
1170 assert not os.path.exists(target)
1171 target_is_dir = True
1172 os.symlink(target, linkname, target_is_dir)
1173
1174 def test_remove_directory_link_to_missing_target(self):
1175 self._create_missing_dir_link()
1176 # For compatibility with Unix, os.remove will check the
1177 # directory status and call RemoveDirectory if the symlink
1178 # was created with target_is_dir==True.
1179 os.remove(self.missing_link)
1180
1181 @unittest.skip("currently fails; consider for improvement")
1182 def test_isdir_on_directory_link_to_missing_target(self):
1183 self._create_missing_dir_link()
1184 # consider having isdir return true for directory links
1185 self.assertTrue(os.path.isdir(self.missing_link))
1186
1187 @unittest.skip("currently fails; consider for improvement")
1188 def test_rmdir_on_directory_link_to_missing_target(self):
1189 self._create_missing_dir_link()
1190 # consider allowing rmdir to remove directory links
1191 os.rmdir(self.missing_link)
1192
1193 def check_stat(self, link, target):
1194 self.assertEqual(os.stat(link), os.stat(target))
1195 self.assertNotEqual(os.lstat(link), os.stat(link))
1196
Brian Curtind25aef52011-06-13 15:16:04 -05001197 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001198 with warnings.catch_warnings():
1199 warnings.simplefilter("ignore", DeprecationWarning)
1200 self.assertEqual(os.stat(bytes_link), os.stat(target))
1201 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001202
1203 def test_12084(self):
1204 level1 = os.path.abspath(support.TESTFN)
1205 level2 = os.path.join(level1, "level2")
1206 level3 = os.path.join(level2, "level3")
1207 try:
1208 os.mkdir(level1)
1209 os.mkdir(level2)
1210 os.mkdir(level3)
1211
1212 file1 = os.path.abspath(os.path.join(level1, "file1"))
1213
1214 with open(file1, "w") as f:
1215 f.write("file1")
1216
1217 orig_dir = os.getcwd()
1218 try:
1219 os.chdir(level2)
1220 link = os.path.join(level2, "link")
1221 os.symlink(os.path.relpath(file1), "link")
1222 self.assertIn("link", os.listdir(os.getcwd()))
1223
1224 # Check os.stat calls from the same dir as the link
1225 self.assertEqual(os.stat(file1), os.stat("link"))
1226
1227 # Check os.stat calls from a dir below the link
1228 os.chdir(level1)
1229 self.assertEqual(os.stat(file1),
1230 os.stat(os.path.relpath(link)))
1231
1232 # Check os.stat calls from a dir above the link
1233 os.chdir(level3)
1234 self.assertEqual(os.stat(file1),
1235 os.stat(os.path.relpath(link)))
1236 finally:
1237 os.chdir(orig_dir)
1238 except OSError as err:
1239 self.fail(err)
1240 finally:
1241 os.remove(file1)
1242 shutil.rmtree(level1)
1243
Brian Curtind40e6f72010-07-08 21:39:08 +00001244
Victor Stinnere8d51452010-08-19 01:05:19 +00001245class FSEncodingTests(unittest.TestCase):
1246 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001247 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1248 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001249
Victor Stinnere8d51452010-08-19 01:05:19 +00001250 def test_identity(self):
1251 # assert fsdecode(fsencode(x)) == x
1252 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1253 try:
1254 bytesfn = os.fsencode(fn)
1255 except UnicodeEncodeError:
1256 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001257 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001258
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001259
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001260class PidTests(unittest.TestCase):
1261 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1262 def test_getppid(self):
1263 p = subprocess.Popen([sys.executable, '-c',
1264 'import os; print(os.getppid())'],
1265 stdout=subprocess.PIPE)
1266 stdout, _ = p.communicate()
1267 # We are the parent of our subprocess
1268 self.assertEqual(int(stdout), os.getpid())
1269
1270
Brian Curtin0151b8e2010-09-24 13:43:43 +00001271# The introduction of this TestCase caused at least two different errors on
1272# *nix buildbots. Temporarily skip this to let the buildbots move along.
1273@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001274@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1275class LoginTests(unittest.TestCase):
1276 def test_getlogin(self):
1277 user_name = os.getlogin()
1278 self.assertNotEqual(len(user_name), 0)
1279
1280
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001281@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1282 "needs os.getpriority and os.setpriority")
1283class ProgramPriorityTests(unittest.TestCase):
1284 """Tests for os.getpriority() and os.setpriority()."""
1285
1286 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001287
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001288 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1289 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1290 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001291 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1292 if base >= 19 and new_prio <= 19:
1293 raise unittest.SkipTest(
1294 "unable to reliably test setpriority at current nice level of %s" % base)
1295 else:
1296 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001297 finally:
1298 try:
1299 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1300 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001301 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001302 raise
1303
1304
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001305if threading is not None:
1306 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001307
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001308 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001309
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001310 def __init__(self, conn):
1311 asynchat.async_chat.__init__(self, conn)
1312 self.in_buffer = []
1313 self.closed = False
1314 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001315
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001316 def handle_read(self):
1317 data = self.recv(4096)
1318 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001319
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001320 def get_data(self):
1321 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001322
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001323 def handle_close(self):
1324 self.close()
1325 self.closed = True
1326
1327 def handle_error(self):
1328 raise
1329
1330 def __init__(self, address):
1331 threading.Thread.__init__(self)
1332 asyncore.dispatcher.__init__(self)
1333 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1334 self.bind(address)
1335 self.listen(5)
1336 self.host, self.port = self.socket.getsockname()[:2]
1337 self.handler_instance = None
1338 self._active = False
1339 self._active_lock = threading.Lock()
1340
1341 # --- public API
1342
1343 @property
1344 def running(self):
1345 return self._active
1346
1347 def start(self):
1348 assert not self.running
1349 self.__flag = threading.Event()
1350 threading.Thread.start(self)
1351 self.__flag.wait()
1352
1353 def stop(self):
1354 assert self.running
1355 self._active = False
1356 self.join()
1357
1358 def wait(self):
1359 # wait for handler connection to be closed, then stop the server
1360 while not getattr(self.handler_instance, "closed", False):
1361 time.sleep(0.001)
1362 self.stop()
1363
1364 # --- internals
1365
1366 def run(self):
1367 self._active = True
1368 self.__flag.set()
1369 while self._active and asyncore.socket_map:
1370 self._active_lock.acquire()
1371 asyncore.loop(timeout=0.001, count=1)
1372 self._active_lock.release()
1373 asyncore.close_all()
1374
1375 def handle_accept(self):
1376 conn, addr = self.accept()
1377 self.handler_instance = self.Handler(conn)
1378
1379 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001380 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001381 handle_read = handle_connect
1382
1383 def writable(self):
1384 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001385
1386 def handle_error(self):
1387 raise
1388
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001389
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001390@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001391@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1392class TestSendfile(unittest.TestCase):
1393
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001394 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001395 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001396 not sys.platform.startswith("solaris") and \
1397 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001398
1399 @classmethod
1400 def setUpClass(cls):
1401 with open(support.TESTFN, "wb") as f:
1402 f.write(cls.DATA)
1403
1404 @classmethod
1405 def tearDownClass(cls):
1406 support.unlink(support.TESTFN)
1407
1408 def setUp(self):
1409 self.server = SendfileTestServer((support.HOST, 0))
1410 self.server.start()
1411 self.client = socket.socket()
1412 self.client.connect((self.server.host, self.server.port))
1413 self.client.settimeout(1)
1414 # synchronize by waiting for "220 ready" response
1415 self.client.recv(1024)
1416 self.sockno = self.client.fileno()
1417 self.file = open(support.TESTFN, 'rb')
1418 self.fileno = self.file.fileno()
1419
1420 def tearDown(self):
1421 self.file.close()
1422 self.client.close()
1423 if self.server.running:
1424 self.server.stop()
1425
1426 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1427 """A higher level wrapper representing how an application is
1428 supposed to use sendfile().
1429 """
1430 while 1:
1431 try:
1432 if self.SUPPORT_HEADERS_TRAILERS:
1433 return os.sendfile(sock, file, offset, nbytes, headers,
1434 trailers)
1435 else:
1436 return os.sendfile(sock, file, offset, nbytes)
1437 except OSError as err:
1438 if err.errno == errno.ECONNRESET:
1439 # disconnected
1440 raise
1441 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1442 # we have to retry send data
1443 continue
1444 else:
1445 raise
1446
1447 def test_send_whole_file(self):
1448 # normal send
1449 total_sent = 0
1450 offset = 0
1451 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001452 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001453 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1454 if sent == 0:
1455 break
1456 offset += sent
1457 total_sent += sent
1458 self.assertTrue(sent <= nbytes)
1459 self.assertEqual(offset, total_sent)
1460
1461 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001462 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001463 self.client.close()
1464 self.server.wait()
1465 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001466 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001467 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001468
1469 def test_send_at_certain_offset(self):
1470 # start sending a file at a certain offset
1471 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001472 offset = len(self.DATA) // 2
1473 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001474 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001475 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001476 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1477 if sent == 0:
1478 break
1479 offset += sent
1480 total_sent += sent
1481 self.assertTrue(sent <= nbytes)
1482
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001483 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001484 self.client.close()
1485 self.server.wait()
1486 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001487 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001488 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001489 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001490 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001491
1492 def test_offset_overflow(self):
1493 # specify an offset > file size
1494 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001495 try:
1496 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1497 except OSError as e:
1498 # Solaris can raise EINVAL if offset >= file length, ignore.
1499 if e.errno != errno.EINVAL:
1500 raise
1501 else:
1502 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001503 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001504 self.client.close()
1505 self.server.wait()
1506 data = self.server.handler_instance.get_data()
1507 self.assertEqual(data, b'')
1508
1509 def test_invalid_offset(self):
1510 with self.assertRaises(OSError) as cm:
1511 os.sendfile(self.sockno, self.fileno, -1, 4096)
1512 self.assertEqual(cm.exception.errno, errno.EINVAL)
1513
1514 # --- headers / trailers tests
1515
1516 if SUPPORT_HEADERS_TRAILERS:
1517
1518 def test_headers(self):
1519 total_sent = 0
1520 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1521 headers=[b"x" * 512])
1522 total_sent += sent
1523 offset = 4096
1524 nbytes = 4096
1525 while 1:
1526 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1527 offset, nbytes)
1528 if sent == 0:
1529 break
1530 total_sent += sent
1531 offset += sent
1532
1533 expected_data = b"x" * 512 + self.DATA
1534 self.assertEqual(total_sent, len(expected_data))
1535 self.client.close()
1536 self.server.wait()
1537 data = self.server.handler_instance.get_data()
1538 self.assertEqual(hash(data), hash(expected_data))
1539
1540 def test_trailers(self):
1541 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001542 with open(TESTFN2, 'wb') as f:
1543 f.write(b"abcde")
1544 with open(TESTFN2, 'rb')as f:
1545 self.addCleanup(os.remove, TESTFN2)
1546 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1547 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001548 self.client.close()
1549 self.server.wait()
1550 data = self.server.handler_instance.get_data()
1551 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001552
1553 if hasattr(os, "SF_NODISKIO"):
1554 def test_flags(self):
1555 try:
1556 os.sendfile(self.sockno, self.fileno, 0, 4096,
1557 flags=os.SF_NODISKIO)
1558 except OSError as err:
1559 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1560 raise
1561
1562
Benjamin Peterson799bd802011-08-31 22:15:17 -04001563def supports_extended_attributes():
1564 if not hasattr(os, "setxattr"):
1565 return False
1566 try:
1567 with open(support.TESTFN, "wb") as fp:
1568 try:
1569 os.fsetxattr(fp.fileno(), b"user.test", b"")
1570 except OSError as e:
1571 if e.errno != errno.ENOTSUP:
1572 raise
1573 return False
1574 finally:
1575 support.unlink(support.TESTFN)
1576 # Kernels < 2.6.39 don't respect setxattr flags.
1577 kernel_version = platform.release()
1578 m = re.match("2.6.(\d{1,2})", kernel_version)
1579 return m is None or int(m.group(1)) >= 39
1580
1581
1582@unittest.skipUnless(supports_extended_attributes(),
1583 "no non-broken extended attribute support")
1584class ExtendedAttributeTests(unittest.TestCase):
1585
1586 def tearDown(self):
1587 support.unlink(support.TESTFN)
1588
1589 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1590 fn = support.TESTFN
1591 open(fn, "wb").close()
1592 with self.assertRaises(OSError) as cm:
1593 getxattr(fn, s("user.test"))
1594 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001595 init_xattr = listxattr(fn)
1596 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001597 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001598 xattr = set(init_xattr)
1599 xattr.add("user.test")
1600 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001601 self.assertEqual(getxattr(fn, b"user.test"), b"")
1602 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1603 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1604 with self.assertRaises(OSError) as cm:
1605 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1606 self.assertEqual(cm.exception.errno, errno.EEXIST)
1607 with self.assertRaises(OSError) as cm:
1608 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1609 self.assertEqual(cm.exception.errno, errno.ENODATA)
1610 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001611 xattr.add("user.test2")
1612 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001613 removexattr(fn, s("user.test"))
1614 with self.assertRaises(OSError) as cm:
1615 getxattr(fn, s("user.test"))
1616 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001617 xattr.remove("user.test")
1618 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001619 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1620 setxattr(fn, s("user.test"), b"a"*1024)
1621 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1622 removexattr(fn, s("user.test"))
1623 many = sorted("user.test{}".format(i) for i in range(100))
1624 for thing in many:
1625 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001626 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001627
1628 def _check_xattrs(self, *args):
1629 def make_bytes(s):
1630 return bytes(s, "ascii")
1631 self._check_xattrs_str(str, *args)
1632 support.unlink(support.TESTFN)
1633 self._check_xattrs_str(make_bytes, *args)
1634
1635 def test_simple(self):
1636 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1637 os.listxattr)
1638
1639 def test_lpath(self):
1640 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1641 os.llistxattr)
1642
1643 def test_fds(self):
1644 def getxattr(path, *args):
1645 with open(path, "rb") as fp:
1646 return os.fgetxattr(fp.fileno(), *args)
1647 def setxattr(path, *args):
1648 with open(path, "wb") as fp:
1649 os.fsetxattr(fp.fileno(), *args)
1650 def removexattr(path, *args):
1651 with open(path, "wb") as fp:
1652 os.fremovexattr(fp.fileno(), *args)
1653 def listxattr(path, *args):
1654 with open(path, "rb") as fp:
1655 return os.flistxattr(fp.fileno(), *args)
1656 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1657
1658
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001659@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1660class Win32DeprecatedBytesAPI(unittest.TestCase):
1661 def test_deprecated(self):
1662 import nt
1663 filename = os.fsencode(support.TESTFN)
1664 with warnings.catch_warnings():
1665 warnings.simplefilter("error", DeprecationWarning)
1666 for func, *args in (
1667 (nt._getfullpathname, filename),
1668 (nt._isdir, filename),
1669 (os.access, filename, os.R_OK),
1670 (os.chdir, filename),
1671 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001672 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001673 (os.link, filename, filename),
1674 (os.listdir, filename),
1675 (os.lstat, filename),
1676 (os.mkdir, filename),
1677 (os.open, filename, os.O_RDONLY),
1678 (os.rename, filename, filename),
1679 (os.rmdir, filename),
1680 (os.startfile, filename),
1681 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001682 (os.unlink, filename),
1683 (os.utime, filename),
1684 ):
1685 self.assertRaises(DeprecationWarning, func, *args)
1686
Victor Stinner28216442011-11-16 00:34:44 +01001687 @support.skip_unless_symlink
1688 def test_symlink(self):
1689 filename = os.fsencode(support.TESTFN)
1690 with warnings.catch_warnings():
1691 warnings.simplefilter("error", DeprecationWarning)
1692 self.assertRaises(DeprecationWarning,
1693 os.symlink, filename, filename)
1694
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001695
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001696@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001697def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001698 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001699 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001700 StatAttributeTests,
1701 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001702 WalkTests,
1703 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001704 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001705 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001706 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001707 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001708 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001709 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001710 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001711 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001712 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001713 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001714 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001715 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001716 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001717 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001718 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001719 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001720 Win32DeprecatedBytesAPI,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001721 )
Fred Drake2e2be372001-09-20 21:33:42 +00001722
1723if __name__ == "__main__":
1724 test_main()