blob: d2595153d851ec492faaec8ed93d6552cb0846bb [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000027try:
28 import threading
29except ImportError:
30 threading = None
Georg Brandl2daf6ae2012-02-20 19:54:16 +010031from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000032
Victor Stinner1aa54a42012-02-08 04:09:37 +010033os.stat_float_times(True)
34st = os.stat(__file__)
35stat_supports_subsecond = (
36 # check if float and int timestamps are different
37 (st.st_atime != st[7])
38 or (st.st_mtime != st[8])
39 or (st.st_ctime != st[9]))
40
Mark Dickinson7cf03892010-04-16 13:45:35 +000041# Detect whether we're on a Linux system that uses the (now outdated
42# and unmaintained) linuxthreads threading library. There's an issue
43# when combining linuxthreads with a failed execv call: see
44# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020045if hasattr(sys, 'thread_info') and sys.thread_info.version:
46 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
47else:
48 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000049
Thomas Wouters0e3f5912006-08-11 14:57:12 +000050# Tests creating TESTFN
51class FileTests(unittest.TestCase):
52 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000053 if os.path.exists(support.TESTFN):
54 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000055 tearDown = setUp
56
57 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000058 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000059 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000060 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000061
Christian Heimesfdab48e2008-01-20 09:06:41 +000062 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
64 # We must allocate two consecutive file descriptors, otherwise
65 # it will mess up other file descriptors (perhaps even the three
66 # standard ones).
67 second = os.dup(first)
68 try:
69 retries = 0
70 while second != first + 1:
71 os.close(first)
72 retries += 1
73 if retries > 10:
74 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000075 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000076 first, second = second, os.dup(second)
77 finally:
78 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000079 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000080 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000081 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000082
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000083 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000084 def test_rename(self):
85 path = support.TESTFN
86 old = sys.getrefcount(path)
87 self.assertRaises(TypeError, os.rename, path, 0)
88 new = sys.getrefcount(path)
89 self.assertEqual(old, new)
90
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000091 def test_read(self):
92 with open(support.TESTFN, "w+b") as fobj:
93 fobj.write(b"spam")
94 fobj.flush()
95 fd = fobj.fileno()
96 os.lseek(fd, 0, 0)
97 s = os.read(fd, 4)
98 self.assertEqual(type(s), bytes)
99 self.assertEqual(s, b"spam")
100
101 def test_write(self):
102 # os.write() accepts bytes- and buffer-like objects but not strings
103 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
104 self.assertRaises(TypeError, os.write, fd, "beans")
105 os.write(fd, b"bacon\n")
106 os.write(fd, bytearray(b"eggs\n"))
107 os.write(fd, memoryview(b"spam\n"))
108 os.close(fd)
109 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000110 self.assertEqual(fobj.read().splitlines(),
111 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000112
Victor Stinnere0daff12011-03-20 23:36:35 +0100113 def write_windows_console(self, *args):
114 retcode = subprocess.call(args,
115 # use a new console to not flood the test output
116 creationflags=subprocess.CREATE_NEW_CONSOLE,
117 # use a shell to hide the console window (SW_HIDE)
118 shell=True)
119 self.assertEqual(retcode, 0)
120
121 @unittest.skipUnless(sys.platform == 'win32',
122 'test specific to the Windows console')
123 def test_write_windows_console(self):
124 # Issue #11395: the Windows console returns an error (12: not enough
125 # space error) on writing into stdout if stdout mode is binary and the
126 # length is greater than 66,000 bytes (or less, depending on heap
127 # usage).
128 code = "print('x' * 100000)"
129 self.write_windows_console(sys.executable, "-c", code)
130 self.write_windows_console(sys.executable, "-u", "-c", code)
131
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000132 def fdopen_helper(self, *args):
133 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200134 f = os.fdopen(fd, *args)
135 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000136
137 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200138 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
139 os.close(fd)
140
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000141 self.fdopen_helper()
142 self.fdopen_helper('r')
143 self.fdopen_helper('r', 100)
144
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100145 def test_replace(self):
146 TESTFN2 = support.TESTFN + ".2"
147 with open(support.TESTFN, 'w') as f:
148 f.write("1")
149 with open(TESTFN2, 'w') as f:
150 f.write("2")
151 self.addCleanup(os.unlink, TESTFN2)
152 os.replace(support.TESTFN, TESTFN2)
153 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
154 with open(TESTFN2, 'r') as f:
155 self.assertEqual(f.read(), "1")
156
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200157
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000158# Test attributes on return values from os.*stat* family.
159class StatAttributeTests(unittest.TestCase):
160 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000161 os.mkdir(support.TESTFN)
162 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000163 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000164 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000166
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 def tearDown(self):
168 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000169 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000170
Antoine Pitrou38425292010-09-21 18:19:07 +0000171 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000172 if not hasattr(os, "stat"):
173 return
174
Antoine Pitrou38425292010-09-21 18:19:07 +0000175 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000176
177 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000178 self.assertEqual(result[stat.ST_SIZE], 3)
179 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 # Make sure all the attributes are there
182 members = dir(result)
183 for name in dir(stat):
184 if name[:3] == 'ST_':
185 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000186 if name.endswith("TIME"):
187 def trunc(x): return int(x)
188 else:
189 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000190 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000191 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000192 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000193
194 try:
195 result[200]
196 self.fail("No exception thrown")
197 except IndexError:
198 pass
199
200 # Make sure that assignment fails
201 try:
202 result.st_mode = 1
203 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000204 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000205 pass
206
207 try:
208 result.st_rdev = 1
209 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000210 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000211 pass
212
213 try:
214 result.parrot = 1
215 self.fail("No exception thrown")
216 except AttributeError:
217 pass
218
219 # Use the stat_result constructor with a too-short tuple.
220 try:
221 result2 = os.stat_result((10,))
222 self.fail("No exception thrown")
223 except TypeError:
224 pass
225
Ezio Melotti42da6632011-03-15 05:18:48 +0200226 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000227 try:
228 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
229 except TypeError:
230 pass
231
Antoine Pitrou38425292010-09-21 18:19:07 +0000232 def test_stat_attributes(self):
233 self.check_stat_attributes(self.fname)
234
235 def test_stat_attributes_bytes(self):
236 try:
237 fname = self.fname.encode(sys.getfilesystemencoding())
238 except UnicodeEncodeError:
239 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100240 with warnings.catch_warnings():
241 warnings.simplefilter("ignore", DeprecationWarning)
242 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000243
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244 def test_statvfs_attributes(self):
245 if not hasattr(os, "statvfs"):
246 return
247
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000248 try:
249 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000250 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000251 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000252 if e.errno == errno.ENOSYS:
253 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254
255 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000258 # Make sure all the attributes are there.
259 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
260 'ffree', 'favail', 'flag', 'namemax')
261 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000262 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
264 # Make sure that assignment really fails
265 try:
266 result.f_bfree = 1
267 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000268 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269 pass
270
271 try:
272 result.parrot = 1
273 self.fail("No exception thrown")
274 except AttributeError:
275 pass
276
277 # Use the constructor with a too-short tuple.
278 try:
279 result2 = os.statvfs_result((10,))
280 self.fail("No exception thrown")
281 except TypeError:
282 pass
283
Ezio Melotti42da6632011-03-15 05:18:48 +0200284 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000285 try:
286 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
287 except TypeError:
288 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000289
Thomas Wouters89f507f2006-12-13 04:49:30 +0000290 def test_utime_dir(self):
291 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000292 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000293 # round to int, because some systems may support sub-second
294 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000295 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
296 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000297 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000298
Brian Curtin52fbea12011-11-06 13:41:17 -0600299 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600300 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600301 # second argument. Check that the previous methods of passing
302 # a time tuple or None work in addition to no argument.
303 st = os.stat(support.TESTFN)
304 # Doesn't set anything new, but sets the time tuple way
305 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
306 # Set to the current time in the old explicit way.
307 os.utime(support.TESTFN, None)
308 st1 = os.stat(support.TESTFN)
309 # Set to the current time in the new way
310 os.utime(support.TESTFN)
311 st2 = os.stat(support.TESTFN)
312 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
313
Victor Stinner1aa54a42012-02-08 04:09:37 +0100314 @unittest.skipUnless(stat_supports_subsecond,
315 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100316 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100317 asec, amsec = 1, 901
318 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100319 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100320 mtime = msec + mmsec * 1e-3
321 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100322 os.utime(filename, (0, 0))
323 set_time_func(filename, atime, mtime)
Victor Stinner1aa54a42012-02-08 04:09:37 +0100324 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100325 st = os.stat(filename)
326 self.assertAlmostEqual(st.st_atime, atime, places=3)
327 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100328
Victor Stinnera2f7c002012-02-08 03:36:25 +0100329 def test_utime_subsecond(self):
330 def set_time(filename, atime, mtime):
331 os.utime(filename, (atime, mtime))
332 self._test_utime_subsecond(set_time)
333
334 @unittest.skipUnless(hasattr(os, 'futimes'),
335 "os.futimes required for this test.")
336 def test_futimes_subsecond(self):
337 def set_time(filename, atime, mtime):
338 with open(filename, "wb") as f:
339 os.futimes(f.fileno(), (atime, mtime))
340 self._test_utime_subsecond(set_time)
341
342 @unittest.skipUnless(hasattr(os, 'futimens'),
343 "os.futimens required for this test.")
344 def test_futimens_subsecond(self):
345 def set_time(filename, atime, mtime):
346 with open(filename, "wb") as f:
347 asec, ansec = divmod(atime, 1.0)
348 asec = int(asec)
349 ansec = int(ansec * 1e9)
350 msec, mnsec = divmod(mtime, 1.0)
351 msec = int(msec)
352 mnsec = int(mnsec * 1e9)
353 os.futimens(f.fileno(),
354 (asec, ansec),
355 (msec, mnsec))
356 self._test_utime_subsecond(set_time)
357
358 @unittest.skipUnless(hasattr(os, 'futimesat'),
359 "os.futimesat required for this test.")
360 def test_futimesat_subsecond(self):
361 def set_time(filename, atime, mtime):
362 dirname = os.path.dirname(filename)
363 dirfd = os.open(dirname, os.O_RDONLY)
364 try:
365 os.futimesat(dirfd, os.path.basename(filename),
366 (atime, mtime))
367 finally:
368 os.close(dirfd)
369 self._test_utime_subsecond(set_time)
370
371 @unittest.skipUnless(hasattr(os, 'lutimes'),
372 "os.lutimes required for this test.")
373 def test_lutimes_subsecond(self):
374 def set_time(filename, atime, mtime):
375 os.lutimes(filename, (atime, mtime))
376 self._test_utime_subsecond(set_time)
377
378 @unittest.skipUnless(hasattr(os, 'utimensat'),
379 "os.utimensat required for this test.")
380 def test_utimensat_subsecond(self):
381 def set_time(filename, atime, mtime):
382 dirname = os.path.dirname(filename)
383 dirfd = os.open(dirname, os.O_RDONLY)
384 try:
385 asec, ansec = divmod(atime, 1.0)
386 asec = int(asec)
387 ansec = int(ansec * 1e9)
388 msec, mnsec = divmod(mtime, 1.0)
389 msec = int(msec)
390 mnsec = int(mnsec * 1e9)
391 os.utimensat(dirfd, os.path.basename(filename),
392 (asec, ansec),
393 (msec, mnsec))
394 finally:
395 os.close(dirfd)
396 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100397
Thomas Wouters89f507f2006-12-13 04:49:30 +0000398 # Restrict test to Win32, since there is no guarantee other
399 # systems support centiseconds
400 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000401 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000402 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000403 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000404 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000405 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000406 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000407 return buf.value
408
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000409 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000410 def test_1565150(self):
411 t1 = 1159195039.25
412 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000413 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000414
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000415 def test_large_time(self):
416 t1 = 5000000000 # some day in 2128
417 os.utime(self.fname, (t1, t1))
418 self.assertEqual(os.stat(self.fname).st_mtime, t1)
419
Guido van Rossumd8faa362007-04-27 19:54:29 +0000420 def test_1686475(self):
421 # Verify that an open file can be stat'ed
422 try:
423 os.stat(r"c:\pagefile.sys")
424 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000425 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000426 return
427 self.fail("Could not stat pagefile.sys")
428
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000429from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000430
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000431class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000432 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000433 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000434
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000435 def setUp(self):
436 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000437 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000438 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000439 for key, value in self._reference().items():
440 os.environ[key] = value
441
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000442 def tearDown(self):
443 os.environ.clear()
444 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000445 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000446 os.environb.clear()
447 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000448
Christian Heimes90333392007-11-01 19:08:42 +0000449 def _reference(self):
450 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
451
452 def _empty_mapping(self):
453 os.environ.clear()
454 return os.environ
455
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000456 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000457 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000458 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000459 if os.path.exists("/bin/sh"):
460 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000461 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
462 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000463 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000464
Christian Heimes1a13d592007-11-08 14:16:55 +0000465 def test_os_popen_iter(self):
466 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000467 with os.popen(
468 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
469 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000470 self.assertEqual(next(it), "line1\n")
471 self.assertEqual(next(it), "line2\n")
472 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000473 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000474
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000475 # Verify environ keys and values from the OS are of the
476 # correct str type.
477 def test_keyvalue_types(self):
478 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000479 self.assertEqual(type(key), str)
480 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000481
Christian Heimes90333392007-11-01 19:08:42 +0000482 def test_items(self):
483 for key, value in self._reference().items():
484 self.assertEqual(os.environ.get(key), value)
485
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000486 # Issue 7310
487 def test___repr__(self):
488 """Check that the repr() of os.environ looks like environ({...})."""
489 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000490 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
491 '{!r}: {!r}'.format(key, value)
492 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000493
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000494 def test_get_exec_path(self):
495 defpath_list = os.defpath.split(os.pathsep)
496 test_path = ['/monty', '/python', '', '/flying/circus']
497 test_env = {'PATH': os.pathsep.join(test_path)}
498
499 saved_environ = os.environ
500 try:
501 os.environ = dict(test_env)
502 # Test that defaulting to os.environ works.
503 self.assertSequenceEqual(test_path, os.get_exec_path())
504 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
505 finally:
506 os.environ = saved_environ
507
508 # No PATH environment variable
509 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
510 # Empty PATH environment variable
511 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
512 # Supplied PATH environment variable
513 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
514
Victor Stinnerb745a742010-05-18 17:17:23 +0000515 if os.supports_bytes_environ:
516 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000517 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000518 # ignore BytesWarning warning
519 with warnings.catch_warnings(record=True):
520 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000521 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000522 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000523 pass
524 else:
525 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000526
527 # bytes key and/or value
528 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
529 ['abc'])
530 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
531 ['abc'])
532 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
533 ['abc'])
534
535 @unittest.skipUnless(os.supports_bytes_environ,
536 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000537 def test_environb(self):
538 # os.environ -> os.environb
539 value = 'euro\u20ac'
540 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000541 value_bytes = value.encode(sys.getfilesystemencoding(),
542 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000543 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000544 msg = "U+20AC character is not encodable to %s" % (
545 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000546 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000547 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000548 self.assertEqual(os.environ['unicode'], value)
549 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000550
551 # os.environb -> os.environ
552 value = b'\xff'
553 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000554 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000555 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000556 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000557
Charles-François Natali2966f102011-11-26 11:32:46 +0100558 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
559 # #13415).
560 @support.requires_freebsd_version(7)
561 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100562 def test_unset_error(self):
563 if sys.platform == "win32":
564 # an environment variable is limited to 32,767 characters
565 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100566 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100567 else:
568 # "=" is not allowed in a variable name
569 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100570 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100571
Tim Petersc4e09402003-04-25 07:11:48 +0000572class WalkTests(unittest.TestCase):
573 """Tests for os.walk()."""
574
Charles-François Natali7372b062012-02-05 15:15:38 +0100575 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000576 import os
577 from os.path import join
578
579 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000580 # TESTFN/
581 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000582 # tmp1
583 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000584 # tmp2
585 # SUB11/ no kids
586 # SUB2/ a file kid and a dirsymlink kid
587 # tmp3
588 # link/ a symlink to TESTFN.2
589 # TEST2/
590 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000591 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000592 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000593 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000594 sub2_path = join(walk_path, "SUB2")
595 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000596 tmp2_path = join(sub1_path, "tmp2")
597 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000598 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000599 t2_path = join(support.TESTFN, "TEST2")
600 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000601
602 # Create stuff.
603 os.makedirs(sub11_path)
604 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000605 os.makedirs(t2_path)
606 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000607 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000608 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
609 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000610 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100611 if os.name == 'nt':
612 def symlink_to_dir(src, dest):
613 os.symlink(src, dest, True)
614 else:
615 symlink_to_dir = os.symlink
616 symlink_to_dir(os.path.abspath(t2_path), link_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000617 sub2_tree = (sub2_path, ["link"], ["tmp3"])
618 else:
619 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000620
621 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000622 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000623 self.assertEqual(len(all), 4)
624 # We can't know which order SUB1 and SUB2 will appear in.
625 # Not flipped: TESTFN, SUB1, SUB11, SUB2
626 # flipped: TESTFN, SUB2, SUB1, SUB11
627 flipped = all[0][1][0] != "SUB1"
628 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000629 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000630 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
631 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000632 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000633
634 # Prune the search.
635 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000636 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000637 all.append((root, dirs, files))
638 # Don't descend into SUB1.
639 if 'SUB1' in dirs:
640 # Note that this also mutates the dirs we appended to all!
641 dirs.remove('SUB1')
642 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000643 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
644 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000645
646 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000647 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000648 self.assertEqual(len(all), 4)
649 # We can't know which order SUB1 and SUB2 will appear in.
650 # Not flipped: SUB11, SUB1, SUB2, TESTFN
651 # flipped: SUB2, SUB11, SUB1, TESTFN
652 flipped = all[3][1][0] != "SUB1"
653 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000654 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000655 self.assertEqual(all[flipped], (sub11_path, [], []))
656 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000657 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000658
Brian Curtin3b4499c2010-12-28 14:31:47 +0000659 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000660 # Walk, following symlinks.
661 for root, dirs, files in os.walk(walk_path, followlinks=True):
662 if root == link_path:
663 self.assertEqual(dirs, [])
664 self.assertEqual(files, ["tmp4"])
665 break
666 else:
667 self.fail("Didn't follow symlink with followlinks=True")
668
669 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000670 # Tear everything down. This is a decent use for bottom-up on
671 # Windows, which doesn't have a recursive delete command. The
672 # (not so) subtlety is that rmdir will fail unless the dir's
673 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000674 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000675 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000676 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000677 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000678 dirname = os.path.join(root, name)
679 if not os.path.islink(dirname):
680 os.rmdir(dirname)
681 else:
682 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000683 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000684
Charles-François Natali7372b062012-02-05 15:15:38 +0100685
686@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
687class FwalkTests(WalkTests):
688 """Tests for os.fwalk()."""
689
690 def test_compare_to_walk(self):
691 # compare with walk() results
692 for topdown, followlinks in itertools.product((True, False), repeat=2):
693 args = support.TESTFN, topdown, None, followlinks
694 expected = {}
695 for root, dirs, files in os.walk(*args):
696 expected[root] = (set(dirs), set(files))
697
698 for root, dirs, files, rootfd in os.fwalk(*args):
699 self.assertIn(root, expected)
700 self.assertEqual(expected[root], (set(dirs), set(files)))
701
702 def test_dir_fd(self):
703 # check returned file descriptors
704 for topdown, followlinks in itertools.product((True, False), repeat=2):
705 args = support.TESTFN, topdown, None, followlinks
706 for root, dirs, files, rootfd in os.fwalk(*args):
707 # check that the FD is valid
708 os.fstat(rootfd)
Charles-François Natali77940902012-02-06 19:54:48 +0100709 # check that flistdir() returns consistent information
710 self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100711
712 def test_fd_leak(self):
713 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
714 # we both check that calling fwalk() a large number of times doesn't
715 # yield EMFILE, and that the minimum allocated FD hasn't changed.
716 minfd = os.dup(1)
717 os.close(minfd)
718 for i in range(256):
719 for x in os.fwalk(support.TESTFN):
720 pass
721 newfd = os.dup(1)
722 self.addCleanup(os.close, newfd)
723 self.assertEqual(newfd, minfd)
724
725 def tearDown(self):
726 # cleanup
727 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
728 for name in files:
729 os.unlinkat(rootfd, name)
730 for name in dirs:
731 st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW)
732 if stat.S_ISDIR(st.st_mode):
733 os.unlinkat(rootfd, name, os.AT_REMOVEDIR)
734 else:
735 os.unlinkat(rootfd, name)
736 os.rmdir(support.TESTFN)
737
738
Guido van Rossume7ba4952007-06-06 23:52:48 +0000739class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000740 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000741 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000742
743 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000744 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000745 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
746 os.makedirs(path) # Should work
747 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
748 os.makedirs(path)
749
750 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000751 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000752 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
753 os.makedirs(path)
754 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
755 'dir5', 'dir6')
756 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000757
Terry Reedy5a22b652010-12-02 07:05:56 +0000758 def test_exist_ok_existing_directory(self):
759 path = os.path.join(support.TESTFN, 'dir1')
760 mode = 0o777
761 old_mask = os.umask(0o022)
762 os.makedirs(path, mode)
763 self.assertRaises(OSError, os.makedirs, path, mode)
764 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
765 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
766 os.makedirs(path, mode=mode, exist_ok=True)
767 os.umask(old_mask)
768
769 def test_exist_ok_existing_regular_file(self):
770 base = support.TESTFN
771 path = os.path.join(support.TESTFN, 'dir1')
772 f = open(path, 'w')
773 f.write('abc')
774 f.close()
775 self.assertRaises(OSError, os.makedirs, path)
776 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
777 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
778 os.remove(path)
779
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000780 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000781 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000782 'dir4', 'dir5', 'dir6')
783 # If the tests failed, the bottom-most directory ('../dir6')
784 # may not have been created, so we look for the outermost directory
785 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000786 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000787 path = os.path.dirname(path)
788
789 os.removedirs(path)
790
Guido van Rossume7ba4952007-06-06 23:52:48 +0000791class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000792 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200793 with open(os.devnull, 'wb') as f:
794 f.write(b'hello')
795 f.close()
796 with open(os.devnull, 'rb') as f:
797 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000798
Guido van Rossume7ba4952007-06-06 23:52:48 +0000799class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100800 def test_urandom_length(self):
801 self.assertEqual(len(os.urandom(0)), 0)
802 self.assertEqual(len(os.urandom(1)), 1)
803 self.assertEqual(len(os.urandom(10)), 10)
804 self.assertEqual(len(os.urandom(100)), 100)
805 self.assertEqual(len(os.urandom(1000)), 1000)
806
807 def test_urandom_value(self):
808 data1 = os.urandom(16)
809 data2 = os.urandom(16)
810 self.assertNotEqual(data1, data2)
811
812 def get_urandom_subprocess(self, count):
813 code = '\n'.join((
814 'import os, sys',
815 'data = os.urandom(%s)' % count,
816 'sys.stdout.buffer.write(data)',
817 'sys.stdout.buffer.flush()'))
818 out = assert_python_ok('-c', code)
819 stdout = out[1]
820 self.assertEqual(len(stdout), 16)
821 return stdout
822
823 def test_urandom_subprocess(self):
824 data1 = self.get_urandom_subprocess(16)
825 data2 = self.get_urandom_subprocess(16)
826 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000827
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000828@contextlib.contextmanager
829def _execvpe_mockup(defpath=None):
830 """
831 Stubs out execv and execve functions when used as context manager.
832 Records exec calls. The mock execv and execve functions always raise an
833 exception as they would normally never return.
834 """
835 # A list of tuples containing (function name, first arg, args)
836 # of calls to execv or execve that have been made.
837 calls = []
838
839 def mock_execv(name, *args):
840 calls.append(('execv', name, args))
841 raise RuntimeError("execv called")
842
843 def mock_execve(name, *args):
844 calls.append(('execve', name, args))
845 raise OSError(errno.ENOTDIR, "execve called")
846
847 try:
848 orig_execv = os.execv
849 orig_execve = os.execve
850 orig_defpath = os.defpath
851 os.execv = mock_execv
852 os.execve = mock_execve
853 if defpath is not None:
854 os.defpath = defpath
855 yield calls
856 finally:
857 os.execv = orig_execv
858 os.execve = orig_execve
859 os.defpath = orig_defpath
860
Guido van Rossume7ba4952007-06-06 23:52:48 +0000861class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000862 @unittest.skipIf(USING_LINUXTHREADS,
863 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000864 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000865 self.assertRaises(OSError, os.execvpe, 'no such app-',
866 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000867
Thomas Heller6790d602007-08-30 17:15:14 +0000868 def test_execvpe_with_bad_arglist(self):
869 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
870
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000871 @unittest.skipUnless(hasattr(os, '_execvpe'),
872 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000873 def _test_internal_execvpe(self, test_type):
874 program_path = os.sep + 'absolutepath'
875 if test_type is bytes:
876 program = b'executable'
877 fullpath = os.path.join(os.fsencode(program_path), program)
878 native_fullpath = fullpath
879 arguments = [b'progname', 'arg1', 'arg2']
880 else:
881 program = 'executable'
882 arguments = ['progname', 'arg1', 'arg2']
883 fullpath = os.path.join(program_path, program)
884 if os.name != "nt":
885 native_fullpath = os.fsencode(fullpath)
886 else:
887 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000888 env = {'spam': 'beans'}
889
Victor Stinnerb745a742010-05-18 17:17:23 +0000890 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000891 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000892 self.assertRaises(RuntimeError,
893 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000894 self.assertEqual(len(calls), 1)
895 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
896
Victor Stinnerb745a742010-05-18 17:17:23 +0000897 # test os._execvpe() with a relative path:
898 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000899 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000900 self.assertRaises(OSError,
901 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000902 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000903 self.assertSequenceEqual(calls[0],
904 ('execve', native_fullpath, (arguments, env)))
905
906 # test os._execvpe() with a relative path:
907 # os.get_exec_path() reads the 'PATH' variable
908 with _execvpe_mockup() as calls:
909 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000910 if test_type is bytes:
911 env_path[b'PATH'] = program_path
912 else:
913 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000914 self.assertRaises(OSError,
915 os._execvpe, program, arguments, env=env_path)
916 self.assertEqual(len(calls), 1)
917 self.assertSequenceEqual(calls[0],
918 ('execve', native_fullpath, (arguments, env_path)))
919
920 def test_internal_execvpe_str(self):
921 self._test_internal_execvpe(str)
922 if os.name != "nt":
923 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000924
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000925
Thomas Wouters477c8d52006-05-27 19:21:47 +0000926class Win32ErrorTests(unittest.TestCase):
927 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000928 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000929
930 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000931 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000932
933 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000934 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000935
936 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000937 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000938 try:
939 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
940 finally:
941 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000942 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000943
944 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000945 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000946
Thomas Wouters477c8d52006-05-27 19:21:47 +0000947 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000948 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000949
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000950class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000951 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000952 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
953 #singles.append("close")
954 #We omit close because it doesn'r raise an exception on some platforms
955 def get_single(f):
956 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000957 if hasattr(os, f):
958 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000959 return helper
960 for f in singles:
961 locals()["test_"+f] = get_single(f)
962
Benjamin Peterson7522c742009-01-19 21:00:09 +0000963 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000964 try:
965 f(support.make_bad_fd(), *args)
966 except OSError as e:
967 self.assertEqual(e.errno, errno.EBADF)
968 else:
969 self.fail("%r didn't raise a OSError with a bad file descriptor"
970 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000971
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000972 def test_isatty(self):
973 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000974 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000975
976 def test_closerange(self):
977 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000978 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000979 # Make sure none of the descriptors we are about to close are
980 # currently valid (issue 6542).
981 for i in range(10):
982 try: os.fstat(fd+i)
983 except OSError:
984 pass
985 else:
986 break
987 if i < 2:
988 raise unittest.SkipTest(
989 "Unable to acquire a range of invalid file descriptors")
990 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000991
992 def test_dup2(self):
993 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000994 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000995
996 def test_fchmod(self):
997 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000998 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000999
1000 def test_fchown(self):
1001 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001002 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001003
1004 def test_fpathconf(self):
1005 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001006 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001007
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001008 def test_ftruncate(self):
1009 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001010 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001011
1012 def test_lseek(self):
1013 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001014 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001015
1016 def test_read(self):
1017 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001018 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001019
1020 def test_tcsetpgrpt(self):
1021 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001022 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001023
1024 def test_write(self):
1025 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001026 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001027
Brian Curtin1b9df392010-11-24 20:24:31 +00001028
1029class LinkTests(unittest.TestCase):
1030 def setUp(self):
1031 self.file1 = support.TESTFN
1032 self.file2 = os.path.join(support.TESTFN + "2")
1033
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001034 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001035 for file in (self.file1, self.file2):
1036 if os.path.exists(file):
1037 os.unlink(file)
1038
Brian Curtin1b9df392010-11-24 20:24:31 +00001039 def _test_link(self, file1, file2):
1040 with open(file1, "w") as f1:
1041 f1.write("test")
1042
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001043 with warnings.catch_warnings():
1044 warnings.simplefilter("ignore", DeprecationWarning)
1045 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001046 with open(file1, "r") as f1, open(file2, "r") as f2:
1047 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1048
1049 def test_link(self):
1050 self._test_link(self.file1, self.file2)
1051
1052 def test_link_bytes(self):
1053 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1054 bytes(self.file2, sys.getfilesystemencoding()))
1055
Brian Curtinf498b752010-11-30 15:54:04 +00001056 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001057 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001058 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001059 except UnicodeError:
1060 raise unittest.SkipTest("Unable to encode for this platform.")
1061
Brian Curtinf498b752010-11-30 15:54:04 +00001062 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001063 self.file2 = self.file1 + "2"
1064 self._test_link(self.file1, self.file2)
1065
Thomas Wouters477c8d52006-05-27 19:21:47 +00001066if sys.platform != 'win32':
1067 class Win32ErrorTests(unittest.TestCase):
1068 pass
1069
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001070 class PosixUidGidTests(unittest.TestCase):
1071 if hasattr(os, 'setuid'):
1072 def test_setuid(self):
1073 if os.getuid() != 0:
1074 self.assertRaises(os.error, os.setuid, 0)
1075 self.assertRaises(OverflowError, os.setuid, 1<<32)
1076
1077 if hasattr(os, 'setgid'):
1078 def test_setgid(self):
1079 if os.getuid() != 0:
1080 self.assertRaises(os.error, os.setgid, 0)
1081 self.assertRaises(OverflowError, os.setgid, 1<<32)
1082
1083 if hasattr(os, 'seteuid'):
1084 def test_seteuid(self):
1085 if os.getuid() != 0:
1086 self.assertRaises(os.error, os.seteuid, 0)
1087 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1088
1089 if hasattr(os, 'setegid'):
1090 def test_setegid(self):
1091 if os.getuid() != 0:
1092 self.assertRaises(os.error, os.setegid, 0)
1093 self.assertRaises(OverflowError, os.setegid, 1<<32)
1094
1095 if hasattr(os, 'setreuid'):
1096 def test_setreuid(self):
1097 if os.getuid() != 0:
1098 self.assertRaises(os.error, os.setreuid, 0, 0)
1099 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1100 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001101
1102 def test_setreuid_neg1(self):
1103 # Needs to accept -1. We run this in a subprocess to avoid
1104 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001105 subprocess.check_call([
1106 sys.executable, '-c',
1107 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001108
1109 if hasattr(os, 'setregid'):
1110 def test_setregid(self):
1111 if os.getuid() != 0:
1112 self.assertRaises(os.error, os.setregid, 0, 0)
1113 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1114 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001115
1116 def test_setregid_neg1(self):
1117 # Needs to accept -1. We run this in a subprocess to avoid
1118 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001119 subprocess.check_call([
1120 sys.executable, '-c',
1121 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001122
1123 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001124 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001125 if support.TESTFN_UNENCODABLE:
1126 self.dir = support.TESTFN_UNENCODABLE
1127 else:
1128 self.dir = support.TESTFN
1129 self.bdir = os.fsencode(self.dir)
1130
1131 bytesfn = []
1132 def add_filename(fn):
1133 try:
1134 fn = os.fsencode(fn)
1135 except UnicodeEncodeError:
1136 return
1137 bytesfn.append(fn)
1138 add_filename(support.TESTFN_UNICODE)
1139 if support.TESTFN_UNENCODABLE:
1140 add_filename(support.TESTFN_UNENCODABLE)
1141 if not bytesfn:
1142 self.skipTest("couldn't create any non-ascii filename")
1143
1144 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001145 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001146 try:
1147 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001148 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001149 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001150 if fn in self.unicodefn:
1151 raise ValueError("duplicate filename")
1152 self.unicodefn.add(fn)
1153 except:
1154 shutil.rmtree(self.dir)
1155 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001156
1157 def tearDown(self):
1158 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001159
1160 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001161 expected = self.unicodefn
1162 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001163 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001164
1165 def test_open(self):
1166 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001167 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001168 f.close()
1169
1170 def test_stat(self):
1171 for fn in self.unicodefn:
1172 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001173else:
1174 class PosixUidGidTests(unittest.TestCase):
1175 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001176 class Pep383Tests(unittest.TestCase):
1177 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001178
Brian Curtineb24d742010-04-12 17:16:38 +00001179@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1180class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001181 def _kill(self, sig):
1182 # Start sys.executable as a subprocess and communicate from the
1183 # subprocess to the parent that the interpreter is ready. When it
1184 # becomes ready, send *sig* via os.kill to the subprocess and check
1185 # that the return code is equal to *sig*.
1186 import ctypes
1187 from ctypes import wintypes
1188 import msvcrt
1189
1190 # Since we can't access the contents of the process' stdout until the
1191 # process has exited, use PeekNamedPipe to see what's inside stdout
1192 # without waiting. This is done so we can tell that the interpreter
1193 # is started and running at a point where it could handle a signal.
1194 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1195 PeekNamedPipe.restype = wintypes.BOOL
1196 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1197 ctypes.POINTER(ctypes.c_char), # stdout buf
1198 wintypes.DWORD, # Buffer size
1199 ctypes.POINTER(wintypes.DWORD), # bytes read
1200 ctypes.POINTER(wintypes.DWORD), # bytes avail
1201 ctypes.POINTER(wintypes.DWORD)) # bytes left
1202 msg = "running"
1203 proc = subprocess.Popen([sys.executable, "-c",
1204 "import sys;"
1205 "sys.stdout.write('{}');"
1206 "sys.stdout.flush();"
1207 "input()".format(msg)],
1208 stdout=subprocess.PIPE,
1209 stderr=subprocess.PIPE,
1210 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001211 self.addCleanup(proc.stdout.close)
1212 self.addCleanup(proc.stderr.close)
1213 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001214
1215 count, max = 0, 100
1216 while count < max and proc.poll() is None:
1217 # Create a string buffer to store the result of stdout from the pipe
1218 buf = ctypes.create_string_buffer(len(msg))
1219 # Obtain the text currently in proc.stdout
1220 # Bytes read/avail/left are left as NULL and unused
1221 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1222 buf, ctypes.sizeof(buf), None, None, None)
1223 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1224 if buf.value:
1225 self.assertEqual(msg, buf.value.decode())
1226 break
1227 time.sleep(0.1)
1228 count += 1
1229 else:
1230 self.fail("Did not receive communication from the subprocess")
1231
Brian Curtineb24d742010-04-12 17:16:38 +00001232 os.kill(proc.pid, sig)
1233 self.assertEqual(proc.wait(), sig)
1234
1235 def test_kill_sigterm(self):
1236 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001237 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001238
1239 def test_kill_int(self):
1240 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001241 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001242
1243 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001244 tagname = "test_os_%s" % uuid.uuid1()
1245 m = mmap.mmap(-1, 1, tagname)
1246 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001247 # Run a script which has console control handling enabled.
1248 proc = subprocess.Popen([sys.executable,
1249 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001250 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001251 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1252 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001253 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001254 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001255 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001256 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001257 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001258 count += 1
1259 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001260 # Forcefully kill the process if we weren't able to signal it.
1261 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001262 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001263 os.kill(proc.pid, event)
1264 # proc.send_signal(event) could also be done here.
1265 # Allow time for the signal to be passed and the process to exit.
1266 time.sleep(0.5)
1267 if not proc.poll():
1268 # Forcefully kill the process if we weren't able to signal it.
1269 os.kill(proc.pid, signal.SIGINT)
1270 self.fail("subprocess did not stop on {}".format(name))
1271
1272 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1273 def test_CTRL_C_EVENT(self):
1274 from ctypes import wintypes
1275 import ctypes
1276
1277 # Make a NULL value by creating a pointer with no argument.
1278 NULL = ctypes.POINTER(ctypes.c_int)()
1279 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1280 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1281 wintypes.BOOL)
1282 SetConsoleCtrlHandler.restype = wintypes.BOOL
1283
1284 # Calling this with NULL and FALSE causes the calling process to
1285 # handle CTRL+C, rather than ignore it. This property is inherited
1286 # by subprocesses.
1287 SetConsoleCtrlHandler(NULL, 0)
1288
1289 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1290
1291 def test_CTRL_BREAK_EVENT(self):
1292 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1293
1294
Brian Curtind40e6f72010-07-08 21:39:08 +00001295@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001296@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001297class Win32SymlinkTests(unittest.TestCase):
1298 filelink = 'filelinktest'
1299 filelink_target = os.path.abspath(__file__)
1300 dirlink = 'dirlinktest'
1301 dirlink_target = os.path.dirname(filelink_target)
1302 missing_link = 'missing link'
1303
1304 def setUp(self):
1305 assert os.path.exists(self.dirlink_target)
1306 assert os.path.exists(self.filelink_target)
1307 assert not os.path.exists(self.dirlink)
1308 assert not os.path.exists(self.filelink)
1309 assert not os.path.exists(self.missing_link)
1310
1311 def tearDown(self):
1312 if os.path.exists(self.filelink):
1313 os.remove(self.filelink)
1314 if os.path.exists(self.dirlink):
1315 os.rmdir(self.dirlink)
1316 if os.path.lexists(self.missing_link):
1317 os.remove(self.missing_link)
1318
1319 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001320 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001321 self.assertTrue(os.path.exists(self.dirlink))
1322 self.assertTrue(os.path.isdir(self.dirlink))
1323 self.assertTrue(os.path.islink(self.dirlink))
1324 self.check_stat(self.dirlink, self.dirlink_target)
1325
1326 def test_file_link(self):
1327 os.symlink(self.filelink_target, self.filelink)
1328 self.assertTrue(os.path.exists(self.filelink))
1329 self.assertTrue(os.path.isfile(self.filelink))
1330 self.assertTrue(os.path.islink(self.filelink))
1331 self.check_stat(self.filelink, self.filelink_target)
1332
1333 def _create_missing_dir_link(self):
1334 'Create a "directory" link to a non-existent target'
1335 linkname = self.missing_link
1336 if os.path.lexists(linkname):
1337 os.remove(linkname)
1338 target = r'c:\\target does not exist.29r3c740'
1339 assert not os.path.exists(target)
1340 target_is_dir = True
1341 os.symlink(target, linkname, target_is_dir)
1342
1343 def test_remove_directory_link_to_missing_target(self):
1344 self._create_missing_dir_link()
1345 # For compatibility with Unix, os.remove will check the
1346 # directory status and call RemoveDirectory if the symlink
1347 # was created with target_is_dir==True.
1348 os.remove(self.missing_link)
1349
1350 @unittest.skip("currently fails; consider for improvement")
1351 def test_isdir_on_directory_link_to_missing_target(self):
1352 self._create_missing_dir_link()
1353 # consider having isdir return true for directory links
1354 self.assertTrue(os.path.isdir(self.missing_link))
1355
1356 @unittest.skip("currently fails; consider for improvement")
1357 def test_rmdir_on_directory_link_to_missing_target(self):
1358 self._create_missing_dir_link()
1359 # consider allowing rmdir to remove directory links
1360 os.rmdir(self.missing_link)
1361
1362 def check_stat(self, link, target):
1363 self.assertEqual(os.stat(link), os.stat(target))
1364 self.assertNotEqual(os.lstat(link), os.stat(link))
1365
Brian Curtind25aef52011-06-13 15:16:04 -05001366 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001367 with warnings.catch_warnings():
1368 warnings.simplefilter("ignore", DeprecationWarning)
1369 self.assertEqual(os.stat(bytes_link), os.stat(target))
1370 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001371
1372 def test_12084(self):
1373 level1 = os.path.abspath(support.TESTFN)
1374 level2 = os.path.join(level1, "level2")
1375 level3 = os.path.join(level2, "level3")
1376 try:
1377 os.mkdir(level1)
1378 os.mkdir(level2)
1379 os.mkdir(level3)
1380
1381 file1 = os.path.abspath(os.path.join(level1, "file1"))
1382
1383 with open(file1, "w") as f:
1384 f.write("file1")
1385
1386 orig_dir = os.getcwd()
1387 try:
1388 os.chdir(level2)
1389 link = os.path.join(level2, "link")
1390 os.symlink(os.path.relpath(file1), "link")
1391 self.assertIn("link", os.listdir(os.getcwd()))
1392
1393 # Check os.stat calls from the same dir as the link
1394 self.assertEqual(os.stat(file1), os.stat("link"))
1395
1396 # Check os.stat calls from a dir below the link
1397 os.chdir(level1)
1398 self.assertEqual(os.stat(file1),
1399 os.stat(os.path.relpath(link)))
1400
1401 # Check os.stat calls from a dir above the link
1402 os.chdir(level3)
1403 self.assertEqual(os.stat(file1),
1404 os.stat(os.path.relpath(link)))
1405 finally:
1406 os.chdir(orig_dir)
1407 except OSError as err:
1408 self.fail(err)
1409 finally:
1410 os.remove(file1)
1411 shutil.rmtree(level1)
1412
Brian Curtind40e6f72010-07-08 21:39:08 +00001413
Victor Stinnere8d51452010-08-19 01:05:19 +00001414class FSEncodingTests(unittest.TestCase):
1415 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001416 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1417 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001418
Victor Stinnere8d51452010-08-19 01:05:19 +00001419 def test_identity(self):
1420 # assert fsdecode(fsencode(x)) == x
1421 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1422 try:
1423 bytesfn = os.fsencode(fn)
1424 except UnicodeEncodeError:
1425 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001426 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001427
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001428
Brett Cannonefb00c02012-02-29 18:31:31 -05001429
1430class DeviceEncodingTests(unittest.TestCase):
1431
1432 def test_bad_fd(self):
1433 # Return None when an fd doesn't actually exist.
1434 self.assertIsNone(os.device_encoding(123456))
1435
Philip Jenveye308b7c2012-02-29 16:16:15 -08001436 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1437 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001438 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001439 def test_device_encoding(self):
1440 encoding = os.device_encoding(0)
1441 self.assertIsNotNone(encoding)
1442 self.assertTrue(codecs.lookup(encoding))
1443
1444
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001445class PidTests(unittest.TestCase):
1446 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1447 def test_getppid(self):
1448 p = subprocess.Popen([sys.executable, '-c',
1449 'import os; print(os.getppid())'],
1450 stdout=subprocess.PIPE)
1451 stdout, _ = p.communicate()
1452 # We are the parent of our subprocess
1453 self.assertEqual(int(stdout), os.getpid())
1454
1455
Brian Curtin0151b8e2010-09-24 13:43:43 +00001456# The introduction of this TestCase caused at least two different errors on
1457# *nix buildbots. Temporarily skip this to let the buildbots move along.
1458@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001459@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1460class LoginTests(unittest.TestCase):
1461 def test_getlogin(self):
1462 user_name = os.getlogin()
1463 self.assertNotEqual(len(user_name), 0)
1464
1465
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001466@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1467 "needs os.getpriority and os.setpriority")
1468class ProgramPriorityTests(unittest.TestCase):
1469 """Tests for os.getpriority() and os.setpriority()."""
1470
1471 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001472
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001473 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1474 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1475 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001476 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1477 if base >= 19 and new_prio <= 19:
1478 raise unittest.SkipTest(
1479 "unable to reliably test setpriority at current nice level of %s" % base)
1480 else:
1481 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001482 finally:
1483 try:
1484 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1485 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001486 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001487 raise
1488
1489
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001490if threading is not None:
1491 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001492
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001493 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001494
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001495 def __init__(self, conn):
1496 asynchat.async_chat.__init__(self, conn)
1497 self.in_buffer = []
1498 self.closed = False
1499 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001500
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001501 def handle_read(self):
1502 data = self.recv(4096)
1503 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001504
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001505 def get_data(self):
1506 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001507
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001508 def handle_close(self):
1509 self.close()
1510 self.closed = True
1511
1512 def handle_error(self):
1513 raise
1514
1515 def __init__(self, address):
1516 threading.Thread.__init__(self)
1517 asyncore.dispatcher.__init__(self)
1518 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1519 self.bind(address)
1520 self.listen(5)
1521 self.host, self.port = self.socket.getsockname()[:2]
1522 self.handler_instance = None
1523 self._active = False
1524 self._active_lock = threading.Lock()
1525
1526 # --- public API
1527
1528 @property
1529 def running(self):
1530 return self._active
1531
1532 def start(self):
1533 assert not self.running
1534 self.__flag = threading.Event()
1535 threading.Thread.start(self)
1536 self.__flag.wait()
1537
1538 def stop(self):
1539 assert self.running
1540 self._active = False
1541 self.join()
1542
1543 def wait(self):
1544 # wait for handler connection to be closed, then stop the server
1545 while not getattr(self.handler_instance, "closed", False):
1546 time.sleep(0.001)
1547 self.stop()
1548
1549 # --- internals
1550
1551 def run(self):
1552 self._active = True
1553 self.__flag.set()
1554 while self._active and asyncore.socket_map:
1555 self._active_lock.acquire()
1556 asyncore.loop(timeout=0.001, count=1)
1557 self._active_lock.release()
1558 asyncore.close_all()
1559
1560 def handle_accept(self):
1561 conn, addr = self.accept()
1562 self.handler_instance = self.Handler(conn)
1563
1564 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001565 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001566 handle_read = handle_connect
1567
1568 def writable(self):
1569 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001570
1571 def handle_error(self):
1572 raise
1573
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001574
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001575@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001576@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1577class TestSendfile(unittest.TestCase):
1578
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001579 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001580 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001581 not sys.platform.startswith("solaris") and \
1582 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001583
1584 @classmethod
1585 def setUpClass(cls):
1586 with open(support.TESTFN, "wb") as f:
1587 f.write(cls.DATA)
1588
1589 @classmethod
1590 def tearDownClass(cls):
1591 support.unlink(support.TESTFN)
1592
1593 def setUp(self):
1594 self.server = SendfileTestServer((support.HOST, 0))
1595 self.server.start()
1596 self.client = socket.socket()
1597 self.client.connect((self.server.host, self.server.port))
1598 self.client.settimeout(1)
1599 # synchronize by waiting for "220 ready" response
1600 self.client.recv(1024)
1601 self.sockno = self.client.fileno()
1602 self.file = open(support.TESTFN, 'rb')
1603 self.fileno = self.file.fileno()
1604
1605 def tearDown(self):
1606 self.file.close()
1607 self.client.close()
1608 if self.server.running:
1609 self.server.stop()
1610
1611 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1612 """A higher level wrapper representing how an application is
1613 supposed to use sendfile().
1614 """
1615 while 1:
1616 try:
1617 if self.SUPPORT_HEADERS_TRAILERS:
1618 return os.sendfile(sock, file, offset, nbytes, headers,
1619 trailers)
1620 else:
1621 return os.sendfile(sock, file, offset, nbytes)
1622 except OSError as err:
1623 if err.errno == errno.ECONNRESET:
1624 # disconnected
1625 raise
1626 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1627 # we have to retry send data
1628 continue
1629 else:
1630 raise
1631
1632 def test_send_whole_file(self):
1633 # normal send
1634 total_sent = 0
1635 offset = 0
1636 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001637 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001638 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1639 if sent == 0:
1640 break
1641 offset += sent
1642 total_sent += sent
1643 self.assertTrue(sent <= nbytes)
1644 self.assertEqual(offset, total_sent)
1645
1646 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001647 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001648 self.client.close()
1649 self.server.wait()
1650 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001651 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001652 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001653
1654 def test_send_at_certain_offset(self):
1655 # start sending a file at a certain offset
1656 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001657 offset = len(self.DATA) // 2
1658 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001659 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001660 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001661 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1662 if sent == 0:
1663 break
1664 offset += sent
1665 total_sent += sent
1666 self.assertTrue(sent <= nbytes)
1667
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001668 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001669 self.client.close()
1670 self.server.wait()
1671 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001672 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001673 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001674 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001675 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001676
1677 def test_offset_overflow(self):
1678 # specify an offset > file size
1679 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001680 try:
1681 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1682 except OSError as e:
1683 # Solaris can raise EINVAL if offset >= file length, ignore.
1684 if e.errno != errno.EINVAL:
1685 raise
1686 else:
1687 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001688 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001689 self.client.close()
1690 self.server.wait()
1691 data = self.server.handler_instance.get_data()
1692 self.assertEqual(data, b'')
1693
1694 def test_invalid_offset(self):
1695 with self.assertRaises(OSError) as cm:
1696 os.sendfile(self.sockno, self.fileno, -1, 4096)
1697 self.assertEqual(cm.exception.errno, errno.EINVAL)
1698
1699 # --- headers / trailers tests
1700
1701 if SUPPORT_HEADERS_TRAILERS:
1702
1703 def test_headers(self):
1704 total_sent = 0
1705 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1706 headers=[b"x" * 512])
1707 total_sent += sent
1708 offset = 4096
1709 nbytes = 4096
1710 while 1:
1711 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1712 offset, nbytes)
1713 if sent == 0:
1714 break
1715 total_sent += sent
1716 offset += sent
1717
1718 expected_data = b"x" * 512 + self.DATA
1719 self.assertEqual(total_sent, len(expected_data))
1720 self.client.close()
1721 self.server.wait()
1722 data = self.server.handler_instance.get_data()
1723 self.assertEqual(hash(data), hash(expected_data))
1724
1725 def test_trailers(self):
1726 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001727 with open(TESTFN2, 'wb') as f:
1728 f.write(b"abcde")
1729 with open(TESTFN2, 'rb')as f:
1730 self.addCleanup(os.remove, TESTFN2)
1731 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1732 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001733 self.client.close()
1734 self.server.wait()
1735 data = self.server.handler_instance.get_data()
1736 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001737
1738 if hasattr(os, "SF_NODISKIO"):
1739 def test_flags(self):
1740 try:
1741 os.sendfile(self.sockno, self.fileno, 0, 4096,
1742 flags=os.SF_NODISKIO)
1743 except OSError as err:
1744 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1745 raise
1746
1747
Benjamin Peterson799bd802011-08-31 22:15:17 -04001748def supports_extended_attributes():
1749 if not hasattr(os, "setxattr"):
1750 return False
1751 try:
1752 with open(support.TESTFN, "wb") as fp:
1753 try:
1754 os.fsetxattr(fp.fileno(), b"user.test", b"")
1755 except OSError as e:
1756 if e.errno != errno.ENOTSUP:
1757 raise
1758 return False
1759 finally:
1760 support.unlink(support.TESTFN)
1761 # Kernels < 2.6.39 don't respect setxattr flags.
1762 kernel_version = platform.release()
1763 m = re.match("2.6.(\d{1,2})", kernel_version)
1764 return m is None or int(m.group(1)) >= 39
1765
1766
1767@unittest.skipUnless(supports_extended_attributes(),
1768 "no non-broken extended attribute support")
1769class ExtendedAttributeTests(unittest.TestCase):
1770
1771 def tearDown(self):
1772 support.unlink(support.TESTFN)
1773
1774 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1775 fn = support.TESTFN
1776 open(fn, "wb").close()
1777 with self.assertRaises(OSError) as cm:
1778 getxattr(fn, s("user.test"))
1779 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001780 init_xattr = listxattr(fn)
1781 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001782 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001783 xattr = set(init_xattr)
1784 xattr.add("user.test")
1785 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001786 self.assertEqual(getxattr(fn, b"user.test"), b"")
1787 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1788 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1789 with self.assertRaises(OSError) as cm:
1790 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1791 self.assertEqual(cm.exception.errno, errno.EEXIST)
1792 with self.assertRaises(OSError) as cm:
1793 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1794 self.assertEqual(cm.exception.errno, errno.ENODATA)
1795 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001796 xattr.add("user.test2")
1797 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001798 removexattr(fn, s("user.test"))
1799 with self.assertRaises(OSError) as cm:
1800 getxattr(fn, s("user.test"))
1801 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001802 xattr.remove("user.test")
1803 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001804 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1805 setxattr(fn, s("user.test"), b"a"*1024)
1806 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1807 removexattr(fn, s("user.test"))
1808 many = sorted("user.test{}".format(i) for i in range(100))
1809 for thing in many:
1810 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001811 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001812
1813 def _check_xattrs(self, *args):
1814 def make_bytes(s):
1815 return bytes(s, "ascii")
1816 self._check_xattrs_str(str, *args)
1817 support.unlink(support.TESTFN)
1818 self._check_xattrs_str(make_bytes, *args)
1819
1820 def test_simple(self):
1821 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1822 os.listxattr)
1823
1824 def test_lpath(self):
1825 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1826 os.llistxattr)
1827
1828 def test_fds(self):
1829 def getxattr(path, *args):
1830 with open(path, "rb") as fp:
1831 return os.fgetxattr(fp.fileno(), *args)
1832 def setxattr(path, *args):
1833 with open(path, "wb") as fp:
1834 os.fsetxattr(fp.fileno(), *args)
1835 def removexattr(path, *args):
1836 with open(path, "wb") as fp:
1837 os.fremovexattr(fp.fileno(), *args)
1838 def listxattr(path, *args):
1839 with open(path, "rb") as fp:
1840 return os.flistxattr(fp.fileno(), *args)
1841 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1842
1843
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001844@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1845class Win32DeprecatedBytesAPI(unittest.TestCase):
1846 def test_deprecated(self):
1847 import nt
1848 filename = os.fsencode(support.TESTFN)
1849 with warnings.catch_warnings():
1850 warnings.simplefilter("error", DeprecationWarning)
1851 for func, *args in (
1852 (nt._getfullpathname, filename),
1853 (nt._isdir, filename),
1854 (os.access, filename, os.R_OK),
1855 (os.chdir, filename),
1856 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001857 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001858 (os.link, filename, filename),
1859 (os.listdir, filename),
1860 (os.lstat, filename),
1861 (os.mkdir, filename),
1862 (os.open, filename, os.O_RDONLY),
1863 (os.rename, filename, filename),
1864 (os.rmdir, filename),
1865 (os.startfile, filename),
1866 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001867 (os.unlink, filename),
1868 (os.utime, filename),
1869 ):
1870 self.assertRaises(DeprecationWarning, func, *args)
1871
Victor Stinner28216442011-11-16 00:34:44 +01001872 @support.skip_unless_symlink
1873 def test_symlink(self):
1874 filename = os.fsencode(support.TESTFN)
1875 with warnings.catch_warnings():
1876 warnings.simplefilter("error", DeprecationWarning)
1877 self.assertRaises(DeprecationWarning,
1878 os.symlink, filename, filename)
1879
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001880
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001881@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
1882class TermsizeTests(unittest.TestCase):
1883 def test_does_not_crash(self):
1884 """Check if get_terminal_size() returns a meaningful value.
1885
1886 There's no easy portable way to actually check the size of the
1887 terminal, so let's check if it returns something sensible instead.
1888 """
1889 try:
1890 size = os.get_terminal_size()
1891 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001892 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001893 # Under win32 a generic OSError can be thrown if the
1894 # handle cannot be retrieved
1895 self.skipTest("failed to query terminal size")
1896 raise
1897
Antoine Pitroucfade362012-02-08 23:48:59 +01001898 self.assertGreaterEqual(size.columns, 0)
1899 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001900
1901 def test_stty_match(self):
1902 """Check if stty returns the same results
1903
1904 stty actually tests stdin, so get_terminal_size is invoked on
1905 stdin explicitly. If stty succeeded, then get_terminal_size()
1906 should work too.
1907 """
1908 try:
1909 size = subprocess.check_output(['stty', 'size']).decode().split()
1910 except (FileNotFoundError, subprocess.CalledProcessError):
1911 self.skipTest("stty invocation failed")
1912 expected = (int(size[1]), int(size[0])) # reversed order
1913
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001914 try:
1915 actual = os.get_terminal_size(sys.__stdin__.fileno())
1916 except OSError as e:
1917 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
1918 # Under win32 a generic OSError can be thrown if the
1919 # handle cannot be retrieved
1920 self.skipTest("failed to query terminal size")
1921 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001922 self.assertEqual(expected, actual)
1923
1924
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001925@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001926def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001927 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001928 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001929 StatAttributeTests,
1930 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001931 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01001932 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001933 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001934 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001935 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001936 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001937 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001938 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001939 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001940 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001941 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001942 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001943 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05001944 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001945 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001946 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001947 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001948 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001949 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001950 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001951 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001952 TermsizeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001953 )
Fred Drake2e2be372001-09-20 21:33:42 +00001954
1955if __name__ == "__main__":
1956 test_main()