blob: a0f13fd790b0bd6183d00b0f25e278b035ce8324 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000025try:
26 import threading
27except ImportError:
28 threading = None
Georg Brandl2daf6ae2012-02-20 19:54:16 +010029from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000030
Victor Stinner1aa54a42012-02-08 04:09:37 +010031os.stat_float_times(True)
32st = os.stat(__file__)
33stat_supports_subsecond = (
34 # check if float and int timestamps are different
35 (st.st_atime != st[7])
36 or (st.st_mtime != st[8])
37 or (st.st_ctime != st[9]))
38
Mark Dickinson7cf03892010-04-16 13:45:35 +000039# Detect whether we're on a Linux system that uses the (now outdated
40# and unmaintained) linuxthreads threading library. There's an issue
41# when combining linuxthreads with a failed execv call: see
42# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020043if hasattr(sys, 'thread_info') and sys.thread_info.version:
44 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
45else:
46 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000047
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048# Tests creating TESTFN
49class FileTests(unittest.TestCase):
50 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000051 if os.path.exists(support.TESTFN):
52 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000053 tearDown = setUp
54
55 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000056 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000057 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000058 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000059
Christian Heimesfdab48e2008-01-20 09:06:41 +000060 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000061 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
62 # We must allocate two consecutive file descriptors, otherwise
63 # it will mess up other file descriptors (perhaps even the three
64 # standard ones).
65 second = os.dup(first)
66 try:
67 retries = 0
68 while second != first + 1:
69 os.close(first)
70 retries += 1
71 if retries > 10:
72 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000073 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000074 first, second = second, os.dup(second)
75 finally:
76 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000077 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000078 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000079 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000080
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000081 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000082 def test_rename(self):
83 path = support.TESTFN
84 old = sys.getrefcount(path)
85 self.assertRaises(TypeError, os.rename, path, 0)
86 new = sys.getrefcount(path)
87 self.assertEqual(old, new)
88
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000089 def test_read(self):
90 with open(support.TESTFN, "w+b") as fobj:
91 fobj.write(b"spam")
92 fobj.flush()
93 fd = fobj.fileno()
94 os.lseek(fd, 0, 0)
95 s = os.read(fd, 4)
96 self.assertEqual(type(s), bytes)
97 self.assertEqual(s, b"spam")
98
99 def test_write(self):
100 # os.write() accepts bytes- and buffer-like objects but not strings
101 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
102 self.assertRaises(TypeError, os.write, fd, "beans")
103 os.write(fd, b"bacon\n")
104 os.write(fd, bytearray(b"eggs\n"))
105 os.write(fd, memoryview(b"spam\n"))
106 os.close(fd)
107 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000108 self.assertEqual(fobj.read().splitlines(),
109 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000110
Victor Stinnere0daff12011-03-20 23:36:35 +0100111 def write_windows_console(self, *args):
112 retcode = subprocess.call(args,
113 # use a new console to not flood the test output
114 creationflags=subprocess.CREATE_NEW_CONSOLE,
115 # use a shell to hide the console window (SW_HIDE)
116 shell=True)
117 self.assertEqual(retcode, 0)
118
119 @unittest.skipUnless(sys.platform == 'win32',
120 'test specific to the Windows console')
121 def test_write_windows_console(self):
122 # Issue #11395: the Windows console returns an error (12: not enough
123 # space error) on writing into stdout if stdout mode is binary and the
124 # length is greater than 66,000 bytes (or less, depending on heap
125 # usage).
126 code = "print('x' * 100000)"
127 self.write_windows_console(sys.executable, "-c", code)
128 self.write_windows_console(sys.executable, "-u", "-c", code)
129
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000130 def fdopen_helper(self, *args):
131 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200132 f = os.fdopen(fd, *args)
133 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000134
135 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200136 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
137 os.close(fd)
138
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000139 self.fdopen_helper()
140 self.fdopen_helper('r')
141 self.fdopen_helper('r', 100)
142
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100143 def test_replace(self):
144 TESTFN2 = support.TESTFN + ".2"
145 with open(support.TESTFN, 'w') as f:
146 f.write("1")
147 with open(TESTFN2, 'w') as f:
148 f.write("2")
149 self.addCleanup(os.unlink, TESTFN2)
150 os.replace(support.TESTFN, TESTFN2)
151 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
152 with open(TESTFN2, 'r') as f:
153 self.assertEqual(f.read(), "1")
154
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200155
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000156# Test attributes on return values from os.*stat* family.
157class StatAttributeTests(unittest.TestCase):
158 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000159 os.mkdir(support.TESTFN)
160 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000161 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000162 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000163 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000164
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165 def tearDown(self):
166 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000167 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000168
Antoine Pitrou38425292010-09-21 18:19:07 +0000169 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000170 if not hasattr(os, "stat"):
171 return
172
Antoine Pitrou38425292010-09-21 18:19:07 +0000173 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000174
175 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000176 self.assertEqual(result[stat.ST_SIZE], 3)
177 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000178
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000179 # Make sure all the attributes are there
180 members = dir(result)
181 for name in dir(stat):
182 if name[:3] == 'ST_':
183 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000184 if name.endswith("TIME"):
185 def trunc(x): return int(x)
186 else:
187 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000188 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000189 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000190 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000191
192 try:
193 result[200]
194 self.fail("No exception thrown")
195 except IndexError:
196 pass
197
198 # Make sure that assignment fails
199 try:
200 result.st_mode = 1
201 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000202 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203 pass
204
205 try:
206 result.st_rdev = 1
207 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000208 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000209 pass
210
211 try:
212 result.parrot = 1
213 self.fail("No exception thrown")
214 except AttributeError:
215 pass
216
217 # Use the stat_result constructor with a too-short tuple.
218 try:
219 result2 = os.stat_result((10,))
220 self.fail("No exception thrown")
221 except TypeError:
222 pass
223
Ezio Melotti42da6632011-03-15 05:18:48 +0200224 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000225 try:
226 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
227 except TypeError:
228 pass
229
Antoine Pitrou38425292010-09-21 18:19:07 +0000230 def test_stat_attributes(self):
231 self.check_stat_attributes(self.fname)
232
233 def test_stat_attributes_bytes(self):
234 try:
235 fname = self.fname.encode(sys.getfilesystemencoding())
236 except UnicodeEncodeError:
237 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100238 with warnings.catch_warnings():
239 warnings.simplefilter("ignore", DeprecationWarning)
240 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000241
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 def test_statvfs_attributes(self):
243 if not hasattr(os, "statvfs"):
244 return
245
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000246 try:
247 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000248 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000249 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000250 if e.errno == errno.ENOSYS:
251 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000252
253 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000254 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000255
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000256 # Make sure all the attributes are there.
257 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
258 'ffree', 'favail', 'flag', 'namemax')
259 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000260 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261
262 # Make sure that assignment really fails
263 try:
264 result.f_bfree = 1
265 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000266 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267 pass
268
269 try:
270 result.parrot = 1
271 self.fail("No exception thrown")
272 except AttributeError:
273 pass
274
275 # Use the constructor with a too-short tuple.
276 try:
277 result2 = os.statvfs_result((10,))
278 self.fail("No exception thrown")
279 except TypeError:
280 pass
281
Ezio Melotti42da6632011-03-15 05:18:48 +0200282 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000283 try:
284 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
285 except TypeError:
286 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000287
Thomas Wouters89f507f2006-12-13 04:49:30 +0000288 def test_utime_dir(self):
289 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000290 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000291 # round to int, because some systems may support sub-second
292 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000293 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
294 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000295 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000296
Brian Curtin52fbea12011-11-06 13:41:17 -0600297 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600298 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600299 # second argument. Check that the previous methods of passing
300 # a time tuple or None work in addition to no argument.
301 st = os.stat(support.TESTFN)
302 # Doesn't set anything new, but sets the time tuple way
303 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
304 # Set to the current time in the old explicit way.
305 os.utime(support.TESTFN, None)
306 st1 = os.stat(support.TESTFN)
307 # Set to the current time in the new way
308 os.utime(support.TESTFN)
309 st2 = os.stat(support.TESTFN)
310 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
311
Victor Stinner1aa54a42012-02-08 04:09:37 +0100312 @unittest.skipUnless(stat_supports_subsecond,
313 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100314 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100315 asec, amsec = 1, 901
316 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100317 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100318 mtime = msec + mmsec * 1e-3
319 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100320 os.utime(filename, (0, 0))
321 set_time_func(filename, atime, mtime)
Victor Stinner1aa54a42012-02-08 04:09:37 +0100322 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100323 st = os.stat(filename)
324 self.assertAlmostEqual(st.st_atime, atime, places=3)
325 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100326
Victor Stinnera2f7c002012-02-08 03:36:25 +0100327 def test_utime_subsecond(self):
328 def set_time(filename, atime, mtime):
329 os.utime(filename, (atime, mtime))
330 self._test_utime_subsecond(set_time)
331
332 @unittest.skipUnless(hasattr(os, 'futimes'),
333 "os.futimes required for this test.")
334 def test_futimes_subsecond(self):
335 def set_time(filename, atime, mtime):
336 with open(filename, "wb") as f:
337 os.futimes(f.fileno(), (atime, mtime))
338 self._test_utime_subsecond(set_time)
339
340 @unittest.skipUnless(hasattr(os, 'futimens'),
341 "os.futimens required for this test.")
342 def test_futimens_subsecond(self):
343 def set_time(filename, atime, mtime):
344 with open(filename, "wb") as f:
345 asec, ansec = divmod(atime, 1.0)
346 asec = int(asec)
347 ansec = int(ansec * 1e9)
348 msec, mnsec = divmod(mtime, 1.0)
349 msec = int(msec)
350 mnsec = int(mnsec * 1e9)
351 os.futimens(f.fileno(),
352 (asec, ansec),
353 (msec, mnsec))
354 self._test_utime_subsecond(set_time)
355
356 @unittest.skipUnless(hasattr(os, 'futimesat'),
357 "os.futimesat required for this test.")
358 def test_futimesat_subsecond(self):
359 def set_time(filename, atime, mtime):
360 dirname = os.path.dirname(filename)
361 dirfd = os.open(dirname, os.O_RDONLY)
362 try:
363 os.futimesat(dirfd, os.path.basename(filename),
364 (atime, mtime))
365 finally:
366 os.close(dirfd)
367 self._test_utime_subsecond(set_time)
368
369 @unittest.skipUnless(hasattr(os, 'lutimes'),
370 "os.lutimes required for this test.")
371 def test_lutimes_subsecond(self):
372 def set_time(filename, atime, mtime):
373 os.lutimes(filename, (atime, mtime))
374 self._test_utime_subsecond(set_time)
375
376 @unittest.skipUnless(hasattr(os, 'utimensat'),
377 "os.utimensat required for this test.")
378 def test_utimensat_subsecond(self):
379 def set_time(filename, atime, mtime):
380 dirname = os.path.dirname(filename)
381 dirfd = os.open(dirname, os.O_RDONLY)
382 try:
383 asec, ansec = divmod(atime, 1.0)
384 asec = int(asec)
385 ansec = int(ansec * 1e9)
386 msec, mnsec = divmod(mtime, 1.0)
387 msec = int(msec)
388 mnsec = int(mnsec * 1e9)
389 os.utimensat(dirfd, os.path.basename(filename),
390 (asec, ansec),
391 (msec, mnsec))
392 finally:
393 os.close(dirfd)
394 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100395
Thomas Wouters89f507f2006-12-13 04:49:30 +0000396 # Restrict test to Win32, since there is no guarantee other
397 # systems support centiseconds
398 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000399 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000400 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000401 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000402 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000403 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000404 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000405 return buf.value
406
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000407 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000408 def test_1565150(self):
409 t1 = 1159195039.25
410 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000411 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000412
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000413 def test_large_time(self):
414 t1 = 5000000000 # some day in 2128
415 os.utime(self.fname, (t1, t1))
416 self.assertEqual(os.stat(self.fname).st_mtime, t1)
417
Guido van Rossumd8faa362007-04-27 19:54:29 +0000418 def test_1686475(self):
419 # Verify that an open file can be stat'ed
420 try:
421 os.stat(r"c:\pagefile.sys")
422 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000423 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000424 return
425 self.fail("Could not stat pagefile.sys")
426
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000427from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000428
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000429class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000430 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000431 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000432
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000433 def setUp(self):
434 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000435 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000436 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000437 for key, value in self._reference().items():
438 os.environ[key] = value
439
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000440 def tearDown(self):
441 os.environ.clear()
442 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000443 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000444 os.environb.clear()
445 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000446
Christian Heimes90333392007-11-01 19:08:42 +0000447 def _reference(self):
448 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
449
450 def _empty_mapping(self):
451 os.environ.clear()
452 return os.environ
453
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000454 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000455 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000456 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000457 if os.path.exists("/bin/sh"):
458 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000459 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
460 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000461 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000462
Christian Heimes1a13d592007-11-08 14:16:55 +0000463 def test_os_popen_iter(self):
464 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000465 with os.popen(
466 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
467 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000468 self.assertEqual(next(it), "line1\n")
469 self.assertEqual(next(it), "line2\n")
470 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000471 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000472
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000473 # Verify environ keys and values from the OS are of the
474 # correct str type.
475 def test_keyvalue_types(self):
476 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(type(key), str)
478 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000479
Christian Heimes90333392007-11-01 19:08:42 +0000480 def test_items(self):
481 for key, value in self._reference().items():
482 self.assertEqual(os.environ.get(key), value)
483
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000484 # Issue 7310
485 def test___repr__(self):
486 """Check that the repr() of os.environ looks like environ({...})."""
487 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000488 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
489 '{!r}: {!r}'.format(key, value)
490 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000491
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000492 def test_get_exec_path(self):
493 defpath_list = os.defpath.split(os.pathsep)
494 test_path = ['/monty', '/python', '', '/flying/circus']
495 test_env = {'PATH': os.pathsep.join(test_path)}
496
497 saved_environ = os.environ
498 try:
499 os.environ = dict(test_env)
500 # Test that defaulting to os.environ works.
501 self.assertSequenceEqual(test_path, os.get_exec_path())
502 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
503 finally:
504 os.environ = saved_environ
505
506 # No PATH environment variable
507 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
508 # Empty PATH environment variable
509 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
510 # Supplied PATH environment variable
511 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
512
Victor Stinnerb745a742010-05-18 17:17:23 +0000513 if os.supports_bytes_environ:
514 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000515 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000516 # ignore BytesWarning warning
517 with warnings.catch_warnings(record=True):
518 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000519 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000520 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000521 pass
522 else:
523 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000524
525 # bytes key and/or value
526 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
527 ['abc'])
528 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
529 ['abc'])
530 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
531 ['abc'])
532
533 @unittest.skipUnless(os.supports_bytes_environ,
534 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000535 def test_environb(self):
536 # os.environ -> os.environb
537 value = 'euro\u20ac'
538 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000539 value_bytes = value.encode(sys.getfilesystemencoding(),
540 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000541 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000542 msg = "U+20AC character is not encodable to %s" % (
543 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000544 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000545 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000546 self.assertEqual(os.environ['unicode'], value)
547 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000548
549 # os.environb -> os.environ
550 value = b'\xff'
551 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000552 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000553 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000554 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000555
Charles-François Natali2966f102011-11-26 11:32:46 +0100556 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
557 # #13415).
558 @support.requires_freebsd_version(7)
559 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100560 def test_unset_error(self):
561 if sys.platform == "win32":
562 # an environment variable is limited to 32,767 characters
563 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100564 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100565 else:
566 # "=" is not allowed in a variable name
567 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100568 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100569
Tim Petersc4e09402003-04-25 07:11:48 +0000570class WalkTests(unittest.TestCase):
571 """Tests for os.walk()."""
572
Charles-François Natali7372b062012-02-05 15:15:38 +0100573 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000574 import os
575 from os.path import join
576
577 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000578 # TESTFN/
579 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000580 # tmp1
581 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000582 # tmp2
583 # SUB11/ no kids
584 # SUB2/ a file kid and a dirsymlink kid
585 # tmp3
586 # link/ a symlink to TESTFN.2
587 # TEST2/
588 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000589 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000590 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000591 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000592 sub2_path = join(walk_path, "SUB2")
593 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000594 tmp2_path = join(sub1_path, "tmp2")
595 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000596 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000597 t2_path = join(support.TESTFN, "TEST2")
598 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000599
600 # Create stuff.
601 os.makedirs(sub11_path)
602 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000603 os.makedirs(t2_path)
604 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000605 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000606 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
607 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000608 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100609 if os.name == 'nt':
610 def symlink_to_dir(src, dest):
611 os.symlink(src, dest, True)
612 else:
613 symlink_to_dir = os.symlink
614 symlink_to_dir(os.path.abspath(t2_path), link_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000615 sub2_tree = (sub2_path, ["link"], ["tmp3"])
616 else:
617 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000618
619 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000620 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000621 self.assertEqual(len(all), 4)
622 # We can't know which order SUB1 and SUB2 will appear in.
623 # Not flipped: TESTFN, SUB1, SUB11, SUB2
624 # flipped: TESTFN, SUB2, SUB1, SUB11
625 flipped = all[0][1][0] != "SUB1"
626 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000627 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000628 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
629 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000630 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000631
632 # Prune the search.
633 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000634 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000635 all.append((root, dirs, files))
636 # Don't descend into SUB1.
637 if 'SUB1' in dirs:
638 # Note that this also mutates the dirs we appended to all!
639 dirs.remove('SUB1')
640 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000641 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
642 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000643
644 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000645 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000646 self.assertEqual(len(all), 4)
647 # We can't know which order SUB1 and SUB2 will appear in.
648 # Not flipped: SUB11, SUB1, SUB2, TESTFN
649 # flipped: SUB2, SUB11, SUB1, TESTFN
650 flipped = all[3][1][0] != "SUB1"
651 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000652 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000653 self.assertEqual(all[flipped], (sub11_path, [], []))
654 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000655 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000656
Brian Curtin3b4499c2010-12-28 14:31:47 +0000657 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000658 # Walk, following symlinks.
659 for root, dirs, files in os.walk(walk_path, followlinks=True):
660 if root == link_path:
661 self.assertEqual(dirs, [])
662 self.assertEqual(files, ["tmp4"])
663 break
664 else:
665 self.fail("Didn't follow symlink with followlinks=True")
666
667 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000668 # Tear everything down. This is a decent use for bottom-up on
669 # Windows, which doesn't have a recursive delete command. The
670 # (not so) subtlety is that rmdir will fail unless the dir's
671 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000672 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000673 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000674 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000675 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000676 dirname = os.path.join(root, name)
677 if not os.path.islink(dirname):
678 os.rmdir(dirname)
679 else:
680 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000681 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000682
Charles-François Natali7372b062012-02-05 15:15:38 +0100683
684@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
685class FwalkTests(WalkTests):
686 """Tests for os.fwalk()."""
687
688 def test_compare_to_walk(self):
689 # compare with walk() results
690 for topdown, followlinks in itertools.product((True, False), repeat=2):
691 args = support.TESTFN, topdown, None, followlinks
692 expected = {}
693 for root, dirs, files in os.walk(*args):
694 expected[root] = (set(dirs), set(files))
695
696 for root, dirs, files, rootfd in os.fwalk(*args):
697 self.assertIn(root, expected)
698 self.assertEqual(expected[root], (set(dirs), set(files)))
699
700 def test_dir_fd(self):
701 # check returned file descriptors
702 for topdown, followlinks in itertools.product((True, False), repeat=2):
703 args = support.TESTFN, topdown, None, followlinks
704 for root, dirs, files, rootfd in os.fwalk(*args):
705 # check that the FD is valid
706 os.fstat(rootfd)
Charles-François Natali77940902012-02-06 19:54:48 +0100707 # check that flistdir() returns consistent information
708 self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100709
710 def test_fd_leak(self):
711 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
712 # we both check that calling fwalk() a large number of times doesn't
713 # yield EMFILE, and that the minimum allocated FD hasn't changed.
714 minfd = os.dup(1)
715 os.close(minfd)
716 for i in range(256):
717 for x in os.fwalk(support.TESTFN):
718 pass
719 newfd = os.dup(1)
720 self.addCleanup(os.close, newfd)
721 self.assertEqual(newfd, minfd)
722
723 def tearDown(self):
724 # cleanup
725 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
726 for name in files:
727 os.unlinkat(rootfd, name)
728 for name in dirs:
729 st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW)
730 if stat.S_ISDIR(st.st_mode):
731 os.unlinkat(rootfd, name, os.AT_REMOVEDIR)
732 else:
733 os.unlinkat(rootfd, name)
734 os.rmdir(support.TESTFN)
735
736
Guido van Rossume7ba4952007-06-06 23:52:48 +0000737class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000738 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000739 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000740
741 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000742 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000743 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
744 os.makedirs(path) # Should work
745 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
746 os.makedirs(path)
747
748 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000749 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000750 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
751 os.makedirs(path)
752 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
753 'dir5', 'dir6')
754 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000755
Terry Reedy5a22b652010-12-02 07:05:56 +0000756 def test_exist_ok_existing_directory(self):
757 path = os.path.join(support.TESTFN, 'dir1')
758 mode = 0o777
759 old_mask = os.umask(0o022)
760 os.makedirs(path, mode)
761 self.assertRaises(OSError, os.makedirs, path, mode)
762 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
763 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
764 os.makedirs(path, mode=mode, exist_ok=True)
765 os.umask(old_mask)
766
767 def test_exist_ok_existing_regular_file(self):
768 base = support.TESTFN
769 path = os.path.join(support.TESTFN, 'dir1')
770 f = open(path, 'w')
771 f.write('abc')
772 f.close()
773 self.assertRaises(OSError, os.makedirs, path)
774 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
775 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
776 os.remove(path)
777
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000778 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000779 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000780 'dir4', 'dir5', 'dir6')
781 # If the tests failed, the bottom-most directory ('../dir6')
782 # may not have been created, so we look for the outermost directory
783 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000784 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000785 path = os.path.dirname(path)
786
787 os.removedirs(path)
788
Guido van Rossume7ba4952007-06-06 23:52:48 +0000789class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000790 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200791 with open(os.devnull, 'wb') as f:
792 f.write(b'hello')
793 f.close()
794 with open(os.devnull, 'rb') as f:
795 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000796
Guido van Rossume7ba4952007-06-06 23:52:48 +0000797class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100798 def test_urandom_length(self):
799 self.assertEqual(len(os.urandom(0)), 0)
800 self.assertEqual(len(os.urandom(1)), 1)
801 self.assertEqual(len(os.urandom(10)), 10)
802 self.assertEqual(len(os.urandom(100)), 100)
803 self.assertEqual(len(os.urandom(1000)), 1000)
804
805 def test_urandom_value(self):
806 data1 = os.urandom(16)
807 data2 = os.urandom(16)
808 self.assertNotEqual(data1, data2)
809
810 def get_urandom_subprocess(self, count):
811 code = '\n'.join((
812 'import os, sys',
813 'data = os.urandom(%s)' % count,
814 'sys.stdout.buffer.write(data)',
815 'sys.stdout.buffer.flush()'))
816 out = assert_python_ok('-c', code)
817 stdout = out[1]
818 self.assertEqual(len(stdout), 16)
819 return stdout
820
821 def test_urandom_subprocess(self):
822 data1 = self.get_urandom_subprocess(16)
823 data2 = self.get_urandom_subprocess(16)
824 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000825
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000826@contextlib.contextmanager
827def _execvpe_mockup(defpath=None):
828 """
829 Stubs out execv and execve functions when used as context manager.
830 Records exec calls. The mock execv and execve functions always raise an
831 exception as they would normally never return.
832 """
833 # A list of tuples containing (function name, first arg, args)
834 # of calls to execv or execve that have been made.
835 calls = []
836
837 def mock_execv(name, *args):
838 calls.append(('execv', name, args))
839 raise RuntimeError("execv called")
840
841 def mock_execve(name, *args):
842 calls.append(('execve', name, args))
843 raise OSError(errno.ENOTDIR, "execve called")
844
845 try:
846 orig_execv = os.execv
847 orig_execve = os.execve
848 orig_defpath = os.defpath
849 os.execv = mock_execv
850 os.execve = mock_execve
851 if defpath is not None:
852 os.defpath = defpath
853 yield calls
854 finally:
855 os.execv = orig_execv
856 os.execve = orig_execve
857 os.defpath = orig_defpath
858
Guido van Rossume7ba4952007-06-06 23:52:48 +0000859class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000860 @unittest.skipIf(USING_LINUXTHREADS,
861 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000862 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000863 self.assertRaises(OSError, os.execvpe, 'no such app-',
864 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000865
Thomas Heller6790d602007-08-30 17:15:14 +0000866 def test_execvpe_with_bad_arglist(self):
867 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
868
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000869 @unittest.skipUnless(hasattr(os, '_execvpe'),
870 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000871 def _test_internal_execvpe(self, test_type):
872 program_path = os.sep + 'absolutepath'
873 if test_type is bytes:
874 program = b'executable'
875 fullpath = os.path.join(os.fsencode(program_path), program)
876 native_fullpath = fullpath
877 arguments = [b'progname', 'arg1', 'arg2']
878 else:
879 program = 'executable'
880 arguments = ['progname', 'arg1', 'arg2']
881 fullpath = os.path.join(program_path, program)
882 if os.name != "nt":
883 native_fullpath = os.fsencode(fullpath)
884 else:
885 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000886 env = {'spam': 'beans'}
887
Victor Stinnerb745a742010-05-18 17:17:23 +0000888 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000889 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000890 self.assertRaises(RuntimeError,
891 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000892 self.assertEqual(len(calls), 1)
893 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
894
Victor Stinnerb745a742010-05-18 17:17:23 +0000895 # test os._execvpe() with a relative path:
896 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000897 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000898 self.assertRaises(OSError,
899 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000900 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000901 self.assertSequenceEqual(calls[0],
902 ('execve', native_fullpath, (arguments, env)))
903
904 # test os._execvpe() with a relative path:
905 # os.get_exec_path() reads the 'PATH' variable
906 with _execvpe_mockup() as calls:
907 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000908 if test_type is bytes:
909 env_path[b'PATH'] = program_path
910 else:
911 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000912 self.assertRaises(OSError,
913 os._execvpe, program, arguments, env=env_path)
914 self.assertEqual(len(calls), 1)
915 self.assertSequenceEqual(calls[0],
916 ('execve', native_fullpath, (arguments, env_path)))
917
918 def test_internal_execvpe_str(self):
919 self._test_internal_execvpe(str)
920 if os.name != "nt":
921 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000922
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000923
Thomas Wouters477c8d52006-05-27 19:21:47 +0000924class Win32ErrorTests(unittest.TestCase):
925 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000926 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000927
928 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000929 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000930
931 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000932 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000933
934 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000935 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000936 try:
937 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
938 finally:
939 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000940 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000941
942 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000943 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000944
Thomas Wouters477c8d52006-05-27 19:21:47 +0000945 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000946 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000947
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000948class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000949 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000950 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
951 #singles.append("close")
952 #We omit close because it doesn'r raise an exception on some platforms
953 def get_single(f):
954 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000955 if hasattr(os, f):
956 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000957 return helper
958 for f in singles:
959 locals()["test_"+f] = get_single(f)
960
Benjamin Peterson7522c742009-01-19 21:00:09 +0000961 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000962 try:
963 f(support.make_bad_fd(), *args)
964 except OSError as e:
965 self.assertEqual(e.errno, errno.EBADF)
966 else:
967 self.fail("%r didn't raise a OSError with a bad file descriptor"
968 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000969
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000970 def test_isatty(self):
971 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000972 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000973
974 def test_closerange(self):
975 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000976 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000977 # Make sure none of the descriptors we are about to close are
978 # currently valid (issue 6542).
979 for i in range(10):
980 try: os.fstat(fd+i)
981 except OSError:
982 pass
983 else:
984 break
985 if i < 2:
986 raise unittest.SkipTest(
987 "Unable to acquire a range of invalid file descriptors")
988 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000989
990 def test_dup2(self):
991 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000992 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000993
994 def test_fchmod(self):
995 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000996 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000997
998 def test_fchown(self):
999 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001000 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001001
1002 def test_fpathconf(self):
1003 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001004 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001005
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001006 def test_ftruncate(self):
1007 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001008 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001009
1010 def test_lseek(self):
1011 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001012 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001013
1014 def test_read(self):
1015 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001016 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001017
1018 def test_tcsetpgrpt(self):
1019 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001020 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001021
1022 def test_write(self):
1023 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001024 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001025
Brian Curtin1b9df392010-11-24 20:24:31 +00001026
1027class LinkTests(unittest.TestCase):
1028 def setUp(self):
1029 self.file1 = support.TESTFN
1030 self.file2 = os.path.join(support.TESTFN + "2")
1031
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001032 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001033 for file in (self.file1, self.file2):
1034 if os.path.exists(file):
1035 os.unlink(file)
1036
Brian Curtin1b9df392010-11-24 20:24:31 +00001037 def _test_link(self, file1, file2):
1038 with open(file1, "w") as f1:
1039 f1.write("test")
1040
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001041 with warnings.catch_warnings():
1042 warnings.simplefilter("ignore", DeprecationWarning)
1043 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001044 with open(file1, "r") as f1, open(file2, "r") as f2:
1045 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1046
1047 def test_link(self):
1048 self._test_link(self.file1, self.file2)
1049
1050 def test_link_bytes(self):
1051 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1052 bytes(self.file2, sys.getfilesystemencoding()))
1053
Brian Curtinf498b752010-11-30 15:54:04 +00001054 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001055 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001056 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001057 except UnicodeError:
1058 raise unittest.SkipTest("Unable to encode for this platform.")
1059
Brian Curtinf498b752010-11-30 15:54:04 +00001060 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001061 self.file2 = self.file1 + "2"
1062 self._test_link(self.file1, self.file2)
1063
Thomas Wouters477c8d52006-05-27 19:21:47 +00001064if sys.platform != 'win32':
1065 class Win32ErrorTests(unittest.TestCase):
1066 pass
1067
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001068 class PosixUidGidTests(unittest.TestCase):
1069 if hasattr(os, 'setuid'):
1070 def test_setuid(self):
1071 if os.getuid() != 0:
1072 self.assertRaises(os.error, os.setuid, 0)
1073 self.assertRaises(OverflowError, os.setuid, 1<<32)
1074
1075 if hasattr(os, 'setgid'):
1076 def test_setgid(self):
1077 if os.getuid() != 0:
1078 self.assertRaises(os.error, os.setgid, 0)
1079 self.assertRaises(OverflowError, os.setgid, 1<<32)
1080
1081 if hasattr(os, 'seteuid'):
1082 def test_seteuid(self):
1083 if os.getuid() != 0:
1084 self.assertRaises(os.error, os.seteuid, 0)
1085 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1086
1087 if hasattr(os, 'setegid'):
1088 def test_setegid(self):
1089 if os.getuid() != 0:
1090 self.assertRaises(os.error, os.setegid, 0)
1091 self.assertRaises(OverflowError, os.setegid, 1<<32)
1092
1093 if hasattr(os, 'setreuid'):
1094 def test_setreuid(self):
1095 if os.getuid() != 0:
1096 self.assertRaises(os.error, os.setreuid, 0, 0)
1097 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1098 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001099
1100 def test_setreuid_neg1(self):
1101 # Needs to accept -1. We run this in a subprocess to avoid
1102 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001103 subprocess.check_call([
1104 sys.executable, '-c',
1105 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001106
1107 if hasattr(os, 'setregid'):
1108 def test_setregid(self):
1109 if os.getuid() != 0:
1110 self.assertRaises(os.error, os.setregid, 0, 0)
1111 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1112 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001113
1114 def test_setregid_neg1(self):
1115 # Needs to accept -1. We run this in a subprocess to avoid
1116 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001117 subprocess.check_call([
1118 sys.executable, '-c',
1119 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001120
1121 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001122 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001123 if support.TESTFN_UNENCODABLE:
1124 self.dir = support.TESTFN_UNENCODABLE
1125 else:
1126 self.dir = support.TESTFN
1127 self.bdir = os.fsencode(self.dir)
1128
1129 bytesfn = []
1130 def add_filename(fn):
1131 try:
1132 fn = os.fsencode(fn)
1133 except UnicodeEncodeError:
1134 return
1135 bytesfn.append(fn)
1136 add_filename(support.TESTFN_UNICODE)
1137 if support.TESTFN_UNENCODABLE:
1138 add_filename(support.TESTFN_UNENCODABLE)
1139 if not bytesfn:
1140 self.skipTest("couldn't create any non-ascii filename")
1141
1142 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001143 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001144 try:
1145 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001146 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001147 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001148 if fn in self.unicodefn:
1149 raise ValueError("duplicate filename")
1150 self.unicodefn.add(fn)
1151 except:
1152 shutil.rmtree(self.dir)
1153 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001154
1155 def tearDown(self):
1156 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001157
1158 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001159 expected = self.unicodefn
1160 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001161 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001162
1163 def test_open(self):
1164 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001165 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001166 f.close()
1167
1168 def test_stat(self):
1169 for fn in self.unicodefn:
1170 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001171else:
1172 class PosixUidGidTests(unittest.TestCase):
1173 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001174 class Pep383Tests(unittest.TestCase):
1175 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001176
Brian Curtineb24d742010-04-12 17:16:38 +00001177@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1178class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001179 def _kill(self, sig):
1180 # Start sys.executable as a subprocess and communicate from the
1181 # subprocess to the parent that the interpreter is ready. When it
1182 # becomes ready, send *sig* via os.kill to the subprocess and check
1183 # that the return code is equal to *sig*.
1184 import ctypes
1185 from ctypes import wintypes
1186 import msvcrt
1187
1188 # Since we can't access the contents of the process' stdout until the
1189 # process has exited, use PeekNamedPipe to see what's inside stdout
1190 # without waiting. This is done so we can tell that the interpreter
1191 # is started and running at a point where it could handle a signal.
1192 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1193 PeekNamedPipe.restype = wintypes.BOOL
1194 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1195 ctypes.POINTER(ctypes.c_char), # stdout buf
1196 wintypes.DWORD, # Buffer size
1197 ctypes.POINTER(wintypes.DWORD), # bytes read
1198 ctypes.POINTER(wintypes.DWORD), # bytes avail
1199 ctypes.POINTER(wintypes.DWORD)) # bytes left
1200 msg = "running"
1201 proc = subprocess.Popen([sys.executable, "-c",
1202 "import sys;"
1203 "sys.stdout.write('{}');"
1204 "sys.stdout.flush();"
1205 "input()".format(msg)],
1206 stdout=subprocess.PIPE,
1207 stderr=subprocess.PIPE,
1208 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001209 self.addCleanup(proc.stdout.close)
1210 self.addCleanup(proc.stderr.close)
1211 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001212
1213 count, max = 0, 100
1214 while count < max and proc.poll() is None:
1215 # Create a string buffer to store the result of stdout from the pipe
1216 buf = ctypes.create_string_buffer(len(msg))
1217 # Obtain the text currently in proc.stdout
1218 # Bytes read/avail/left are left as NULL and unused
1219 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1220 buf, ctypes.sizeof(buf), None, None, None)
1221 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1222 if buf.value:
1223 self.assertEqual(msg, buf.value.decode())
1224 break
1225 time.sleep(0.1)
1226 count += 1
1227 else:
1228 self.fail("Did not receive communication from the subprocess")
1229
Brian Curtineb24d742010-04-12 17:16:38 +00001230 os.kill(proc.pid, sig)
1231 self.assertEqual(proc.wait(), sig)
1232
1233 def test_kill_sigterm(self):
1234 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001235 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001236
1237 def test_kill_int(self):
1238 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001239 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001240
1241 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001242 tagname = "test_os_%s" % uuid.uuid1()
1243 m = mmap.mmap(-1, 1, tagname)
1244 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001245 # Run a script which has console control handling enabled.
1246 proc = subprocess.Popen([sys.executable,
1247 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001248 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001249 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1250 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001251 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001252 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001253 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001254 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001255 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001256 count += 1
1257 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001258 # Forcefully kill the process if we weren't able to signal it.
1259 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001260 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001261 os.kill(proc.pid, event)
1262 # proc.send_signal(event) could also be done here.
1263 # Allow time for the signal to be passed and the process to exit.
1264 time.sleep(0.5)
1265 if not proc.poll():
1266 # Forcefully kill the process if we weren't able to signal it.
1267 os.kill(proc.pid, signal.SIGINT)
1268 self.fail("subprocess did not stop on {}".format(name))
1269
1270 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1271 def test_CTRL_C_EVENT(self):
1272 from ctypes import wintypes
1273 import ctypes
1274
1275 # Make a NULL value by creating a pointer with no argument.
1276 NULL = ctypes.POINTER(ctypes.c_int)()
1277 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1278 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1279 wintypes.BOOL)
1280 SetConsoleCtrlHandler.restype = wintypes.BOOL
1281
1282 # Calling this with NULL and FALSE causes the calling process to
1283 # handle CTRL+C, rather than ignore it. This property is inherited
1284 # by subprocesses.
1285 SetConsoleCtrlHandler(NULL, 0)
1286
1287 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1288
1289 def test_CTRL_BREAK_EVENT(self):
1290 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1291
1292
Brian Curtind40e6f72010-07-08 21:39:08 +00001293@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001294@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001295class Win32SymlinkTests(unittest.TestCase):
1296 filelink = 'filelinktest'
1297 filelink_target = os.path.abspath(__file__)
1298 dirlink = 'dirlinktest'
1299 dirlink_target = os.path.dirname(filelink_target)
1300 missing_link = 'missing link'
1301
1302 def setUp(self):
1303 assert os.path.exists(self.dirlink_target)
1304 assert os.path.exists(self.filelink_target)
1305 assert not os.path.exists(self.dirlink)
1306 assert not os.path.exists(self.filelink)
1307 assert not os.path.exists(self.missing_link)
1308
1309 def tearDown(self):
1310 if os.path.exists(self.filelink):
1311 os.remove(self.filelink)
1312 if os.path.exists(self.dirlink):
1313 os.rmdir(self.dirlink)
1314 if os.path.lexists(self.missing_link):
1315 os.remove(self.missing_link)
1316
1317 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001318 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001319 self.assertTrue(os.path.exists(self.dirlink))
1320 self.assertTrue(os.path.isdir(self.dirlink))
1321 self.assertTrue(os.path.islink(self.dirlink))
1322 self.check_stat(self.dirlink, self.dirlink_target)
1323
1324 def test_file_link(self):
1325 os.symlink(self.filelink_target, self.filelink)
1326 self.assertTrue(os.path.exists(self.filelink))
1327 self.assertTrue(os.path.isfile(self.filelink))
1328 self.assertTrue(os.path.islink(self.filelink))
1329 self.check_stat(self.filelink, self.filelink_target)
1330
1331 def _create_missing_dir_link(self):
1332 'Create a "directory" link to a non-existent target'
1333 linkname = self.missing_link
1334 if os.path.lexists(linkname):
1335 os.remove(linkname)
1336 target = r'c:\\target does not exist.29r3c740'
1337 assert not os.path.exists(target)
1338 target_is_dir = True
1339 os.symlink(target, linkname, target_is_dir)
1340
1341 def test_remove_directory_link_to_missing_target(self):
1342 self._create_missing_dir_link()
1343 # For compatibility with Unix, os.remove will check the
1344 # directory status and call RemoveDirectory if the symlink
1345 # was created with target_is_dir==True.
1346 os.remove(self.missing_link)
1347
1348 @unittest.skip("currently fails; consider for improvement")
1349 def test_isdir_on_directory_link_to_missing_target(self):
1350 self._create_missing_dir_link()
1351 # consider having isdir return true for directory links
1352 self.assertTrue(os.path.isdir(self.missing_link))
1353
1354 @unittest.skip("currently fails; consider for improvement")
1355 def test_rmdir_on_directory_link_to_missing_target(self):
1356 self._create_missing_dir_link()
1357 # consider allowing rmdir to remove directory links
1358 os.rmdir(self.missing_link)
1359
1360 def check_stat(self, link, target):
1361 self.assertEqual(os.stat(link), os.stat(target))
1362 self.assertNotEqual(os.lstat(link), os.stat(link))
1363
Brian Curtind25aef52011-06-13 15:16:04 -05001364 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001365 with warnings.catch_warnings():
1366 warnings.simplefilter("ignore", DeprecationWarning)
1367 self.assertEqual(os.stat(bytes_link), os.stat(target))
1368 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001369
1370 def test_12084(self):
1371 level1 = os.path.abspath(support.TESTFN)
1372 level2 = os.path.join(level1, "level2")
1373 level3 = os.path.join(level2, "level3")
1374 try:
1375 os.mkdir(level1)
1376 os.mkdir(level2)
1377 os.mkdir(level3)
1378
1379 file1 = os.path.abspath(os.path.join(level1, "file1"))
1380
1381 with open(file1, "w") as f:
1382 f.write("file1")
1383
1384 orig_dir = os.getcwd()
1385 try:
1386 os.chdir(level2)
1387 link = os.path.join(level2, "link")
1388 os.symlink(os.path.relpath(file1), "link")
1389 self.assertIn("link", os.listdir(os.getcwd()))
1390
1391 # Check os.stat calls from the same dir as the link
1392 self.assertEqual(os.stat(file1), os.stat("link"))
1393
1394 # Check os.stat calls from a dir below the link
1395 os.chdir(level1)
1396 self.assertEqual(os.stat(file1),
1397 os.stat(os.path.relpath(link)))
1398
1399 # Check os.stat calls from a dir above the link
1400 os.chdir(level3)
1401 self.assertEqual(os.stat(file1),
1402 os.stat(os.path.relpath(link)))
1403 finally:
1404 os.chdir(orig_dir)
1405 except OSError as err:
1406 self.fail(err)
1407 finally:
1408 os.remove(file1)
1409 shutil.rmtree(level1)
1410
Brian Curtind40e6f72010-07-08 21:39:08 +00001411
Victor Stinnere8d51452010-08-19 01:05:19 +00001412class FSEncodingTests(unittest.TestCase):
1413 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001414 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1415 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001416
Victor Stinnere8d51452010-08-19 01:05:19 +00001417 def test_identity(self):
1418 # assert fsdecode(fsencode(x)) == x
1419 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1420 try:
1421 bytesfn = os.fsencode(fn)
1422 except UnicodeEncodeError:
1423 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001424 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001425
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001426
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001427class PidTests(unittest.TestCase):
1428 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1429 def test_getppid(self):
1430 p = subprocess.Popen([sys.executable, '-c',
1431 'import os; print(os.getppid())'],
1432 stdout=subprocess.PIPE)
1433 stdout, _ = p.communicate()
1434 # We are the parent of our subprocess
1435 self.assertEqual(int(stdout), os.getpid())
1436
1437
Brian Curtin0151b8e2010-09-24 13:43:43 +00001438# The introduction of this TestCase caused at least two different errors on
1439# *nix buildbots. Temporarily skip this to let the buildbots move along.
1440@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001441@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1442class LoginTests(unittest.TestCase):
1443 def test_getlogin(self):
1444 user_name = os.getlogin()
1445 self.assertNotEqual(len(user_name), 0)
1446
1447
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001448@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1449 "needs os.getpriority and os.setpriority")
1450class ProgramPriorityTests(unittest.TestCase):
1451 """Tests for os.getpriority() and os.setpriority()."""
1452
1453 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001454
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001455 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1456 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1457 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001458 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1459 if base >= 19 and new_prio <= 19:
1460 raise unittest.SkipTest(
1461 "unable to reliably test setpriority at current nice level of %s" % base)
1462 else:
1463 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001464 finally:
1465 try:
1466 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1467 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001468 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001469 raise
1470
1471
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001472if threading is not None:
1473 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001474
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001475 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001476
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001477 def __init__(self, conn):
1478 asynchat.async_chat.__init__(self, conn)
1479 self.in_buffer = []
1480 self.closed = False
1481 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001482
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001483 def handle_read(self):
1484 data = self.recv(4096)
1485 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001486
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001487 def get_data(self):
1488 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001489
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001490 def handle_close(self):
1491 self.close()
1492 self.closed = True
1493
1494 def handle_error(self):
1495 raise
1496
1497 def __init__(self, address):
1498 threading.Thread.__init__(self)
1499 asyncore.dispatcher.__init__(self)
1500 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1501 self.bind(address)
1502 self.listen(5)
1503 self.host, self.port = self.socket.getsockname()[:2]
1504 self.handler_instance = None
1505 self._active = False
1506 self._active_lock = threading.Lock()
1507
1508 # --- public API
1509
1510 @property
1511 def running(self):
1512 return self._active
1513
1514 def start(self):
1515 assert not self.running
1516 self.__flag = threading.Event()
1517 threading.Thread.start(self)
1518 self.__flag.wait()
1519
1520 def stop(self):
1521 assert self.running
1522 self._active = False
1523 self.join()
1524
1525 def wait(self):
1526 # wait for handler connection to be closed, then stop the server
1527 while not getattr(self.handler_instance, "closed", False):
1528 time.sleep(0.001)
1529 self.stop()
1530
1531 # --- internals
1532
1533 def run(self):
1534 self._active = True
1535 self.__flag.set()
1536 while self._active and asyncore.socket_map:
1537 self._active_lock.acquire()
1538 asyncore.loop(timeout=0.001, count=1)
1539 self._active_lock.release()
1540 asyncore.close_all()
1541
1542 def handle_accept(self):
1543 conn, addr = self.accept()
1544 self.handler_instance = self.Handler(conn)
1545
1546 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001547 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001548 handle_read = handle_connect
1549
1550 def writable(self):
1551 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001552
1553 def handle_error(self):
1554 raise
1555
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001556
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001557@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001558@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1559class TestSendfile(unittest.TestCase):
1560
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001561 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001562 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001563 not sys.platform.startswith("solaris") and \
1564 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001565
1566 @classmethod
1567 def setUpClass(cls):
1568 with open(support.TESTFN, "wb") as f:
1569 f.write(cls.DATA)
1570
1571 @classmethod
1572 def tearDownClass(cls):
1573 support.unlink(support.TESTFN)
1574
1575 def setUp(self):
1576 self.server = SendfileTestServer((support.HOST, 0))
1577 self.server.start()
1578 self.client = socket.socket()
1579 self.client.connect((self.server.host, self.server.port))
1580 self.client.settimeout(1)
1581 # synchronize by waiting for "220 ready" response
1582 self.client.recv(1024)
1583 self.sockno = self.client.fileno()
1584 self.file = open(support.TESTFN, 'rb')
1585 self.fileno = self.file.fileno()
1586
1587 def tearDown(self):
1588 self.file.close()
1589 self.client.close()
1590 if self.server.running:
1591 self.server.stop()
1592
1593 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1594 """A higher level wrapper representing how an application is
1595 supposed to use sendfile().
1596 """
1597 while 1:
1598 try:
1599 if self.SUPPORT_HEADERS_TRAILERS:
1600 return os.sendfile(sock, file, offset, nbytes, headers,
1601 trailers)
1602 else:
1603 return os.sendfile(sock, file, offset, nbytes)
1604 except OSError as err:
1605 if err.errno == errno.ECONNRESET:
1606 # disconnected
1607 raise
1608 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1609 # we have to retry send data
1610 continue
1611 else:
1612 raise
1613
1614 def test_send_whole_file(self):
1615 # normal send
1616 total_sent = 0
1617 offset = 0
1618 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001619 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001620 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1621 if sent == 0:
1622 break
1623 offset += sent
1624 total_sent += sent
1625 self.assertTrue(sent <= nbytes)
1626 self.assertEqual(offset, total_sent)
1627
1628 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001629 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001630 self.client.close()
1631 self.server.wait()
1632 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001633 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001634 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001635
1636 def test_send_at_certain_offset(self):
1637 # start sending a file at a certain offset
1638 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001639 offset = len(self.DATA) // 2
1640 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001641 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001642 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001643 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1644 if sent == 0:
1645 break
1646 offset += sent
1647 total_sent += sent
1648 self.assertTrue(sent <= nbytes)
1649
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001650 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001651 self.client.close()
1652 self.server.wait()
1653 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001654 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001655 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001656 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001657 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001658
1659 def test_offset_overflow(self):
1660 # specify an offset > file size
1661 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001662 try:
1663 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1664 except OSError as e:
1665 # Solaris can raise EINVAL if offset >= file length, ignore.
1666 if e.errno != errno.EINVAL:
1667 raise
1668 else:
1669 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001670 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001671 self.client.close()
1672 self.server.wait()
1673 data = self.server.handler_instance.get_data()
1674 self.assertEqual(data, b'')
1675
1676 def test_invalid_offset(self):
1677 with self.assertRaises(OSError) as cm:
1678 os.sendfile(self.sockno, self.fileno, -1, 4096)
1679 self.assertEqual(cm.exception.errno, errno.EINVAL)
1680
1681 # --- headers / trailers tests
1682
1683 if SUPPORT_HEADERS_TRAILERS:
1684
1685 def test_headers(self):
1686 total_sent = 0
1687 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1688 headers=[b"x" * 512])
1689 total_sent += sent
1690 offset = 4096
1691 nbytes = 4096
1692 while 1:
1693 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1694 offset, nbytes)
1695 if sent == 0:
1696 break
1697 total_sent += sent
1698 offset += sent
1699
1700 expected_data = b"x" * 512 + self.DATA
1701 self.assertEqual(total_sent, len(expected_data))
1702 self.client.close()
1703 self.server.wait()
1704 data = self.server.handler_instance.get_data()
1705 self.assertEqual(hash(data), hash(expected_data))
1706
1707 def test_trailers(self):
1708 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001709 with open(TESTFN2, 'wb') as f:
1710 f.write(b"abcde")
1711 with open(TESTFN2, 'rb')as f:
1712 self.addCleanup(os.remove, TESTFN2)
1713 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1714 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001715 self.client.close()
1716 self.server.wait()
1717 data = self.server.handler_instance.get_data()
1718 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001719
1720 if hasattr(os, "SF_NODISKIO"):
1721 def test_flags(self):
1722 try:
1723 os.sendfile(self.sockno, self.fileno, 0, 4096,
1724 flags=os.SF_NODISKIO)
1725 except OSError as err:
1726 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1727 raise
1728
1729
Benjamin Peterson799bd802011-08-31 22:15:17 -04001730def supports_extended_attributes():
1731 if not hasattr(os, "setxattr"):
1732 return False
1733 try:
1734 with open(support.TESTFN, "wb") as fp:
1735 try:
1736 os.fsetxattr(fp.fileno(), b"user.test", b"")
1737 except OSError as e:
1738 if e.errno != errno.ENOTSUP:
1739 raise
1740 return False
1741 finally:
1742 support.unlink(support.TESTFN)
1743 # Kernels < 2.6.39 don't respect setxattr flags.
1744 kernel_version = platform.release()
1745 m = re.match("2.6.(\d{1,2})", kernel_version)
1746 return m is None or int(m.group(1)) >= 39
1747
1748
1749@unittest.skipUnless(supports_extended_attributes(),
1750 "no non-broken extended attribute support")
1751class ExtendedAttributeTests(unittest.TestCase):
1752
1753 def tearDown(self):
1754 support.unlink(support.TESTFN)
1755
1756 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1757 fn = support.TESTFN
1758 open(fn, "wb").close()
1759 with self.assertRaises(OSError) as cm:
1760 getxattr(fn, s("user.test"))
1761 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001762 init_xattr = listxattr(fn)
1763 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001764 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001765 xattr = set(init_xattr)
1766 xattr.add("user.test")
1767 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001768 self.assertEqual(getxattr(fn, b"user.test"), b"")
1769 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1770 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1771 with self.assertRaises(OSError) as cm:
1772 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1773 self.assertEqual(cm.exception.errno, errno.EEXIST)
1774 with self.assertRaises(OSError) as cm:
1775 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1776 self.assertEqual(cm.exception.errno, errno.ENODATA)
1777 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001778 xattr.add("user.test2")
1779 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001780 removexattr(fn, s("user.test"))
1781 with self.assertRaises(OSError) as cm:
1782 getxattr(fn, s("user.test"))
1783 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001784 xattr.remove("user.test")
1785 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001786 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1787 setxattr(fn, s("user.test"), b"a"*1024)
1788 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1789 removexattr(fn, s("user.test"))
1790 many = sorted("user.test{}".format(i) for i in range(100))
1791 for thing in many:
1792 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001793 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001794
1795 def _check_xattrs(self, *args):
1796 def make_bytes(s):
1797 return bytes(s, "ascii")
1798 self._check_xattrs_str(str, *args)
1799 support.unlink(support.TESTFN)
1800 self._check_xattrs_str(make_bytes, *args)
1801
1802 def test_simple(self):
1803 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1804 os.listxattr)
1805
1806 def test_lpath(self):
1807 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1808 os.llistxattr)
1809
1810 def test_fds(self):
1811 def getxattr(path, *args):
1812 with open(path, "rb") as fp:
1813 return os.fgetxattr(fp.fileno(), *args)
1814 def setxattr(path, *args):
1815 with open(path, "wb") as fp:
1816 os.fsetxattr(fp.fileno(), *args)
1817 def removexattr(path, *args):
1818 with open(path, "wb") as fp:
1819 os.fremovexattr(fp.fileno(), *args)
1820 def listxattr(path, *args):
1821 with open(path, "rb") as fp:
1822 return os.flistxattr(fp.fileno(), *args)
1823 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1824
1825
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001826@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1827class Win32DeprecatedBytesAPI(unittest.TestCase):
1828 def test_deprecated(self):
1829 import nt
1830 filename = os.fsencode(support.TESTFN)
1831 with warnings.catch_warnings():
1832 warnings.simplefilter("error", DeprecationWarning)
1833 for func, *args in (
1834 (nt._getfullpathname, filename),
1835 (nt._isdir, filename),
1836 (os.access, filename, os.R_OK),
1837 (os.chdir, filename),
1838 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001839 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001840 (os.link, filename, filename),
1841 (os.listdir, filename),
1842 (os.lstat, filename),
1843 (os.mkdir, filename),
1844 (os.open, filename, os.O_RDONLY),
1845 (os.rename, filename, filename),
1846 (os.rmdir, filename),
1847 (os.startfile, filename),
1848 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001849 (os.unlink, filename),
1850 (os.utime, filename),
1851 ):
1852 self.assertRaises(DeprecationWarning, func, *args)
1853
Victor Stinner28216442011-11-16 00:34:44 +01001854 @support.skip_unless_symlink
1855 def test_symlink(self):
1856 filename = os.fsencode(support.TESTFN)
1857 with warnings.catch_warnings():
1858 warnings.simplefilter("error", DeprecationWarning)
1859 self.assertRaises(DeprecationWarning,
1860 os.symlink, filename, filename)
1861
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001862
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001863@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
1864class TermsizeTests(unittest.TestCase):
1865 def test_does_not_crash(self):
1866 """Check if get_terminal_size() returns a meaningful value.
1867
1868 There's no easy portable way to actually check the size of the
1869 terminal, so let's check if it returns something sensible instead.
1870 """
1871 try:
1872 size = os.get_terminal_size()
1873 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001874 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001875 # Under win32 a generic OSError can be thrown if the
1876 # handle cannot be retrieved
1877 self.skipTest("failed to query terminal size")
1878 raise
1879
Antoine Pitroucfade362012-02-08 23:48:59 +01001880 self.assertGreaterEqual(size.columns, 0)
1881 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001882
1883 def test_stty_match(self):
1884 """Check if stty returns the same results
1885
1886 stty actually tests stdin, so get_terminal_size is invoked on
1887 stdin explicitly. If stty succeeded, then get_terminal_size()
1888 should work too.
1889 """
1890 try:
1891 size = subprocess.check_output(['stty', 'size']).decode().split()
1892 except (FileNotFoundError, subprocess.CalledProcessError):
1893 self.skipTest("stty invocation failed")
1894 expected = (int(size[1]), int(size[0])) # reversed order
1895
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001896 try:
1897 actual = os.get_terminal_size(sys.__stdin__.fileno())
1898 except OSError as e:
1899 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
1900 # Under win32 a generic OSError can be thrown if the
1901 # handle cannot be retrieved
1902 self.skipTest("failed to query terminal size")
1903 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001904 self.assertEqual(expected, actual)
1905
1906
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001907@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001908def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001909 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001910 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001911 StatAttributeTests,
1912 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001913 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01001914 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001915 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001916 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001917 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001918 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001919 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001920 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001921 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001922 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001923 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001924 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001925 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001926 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001927 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001928 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001929 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001930 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001931 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001932 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001933 TermsizeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001934 )
Fred Drake2e2be372001-09-20 21:33:42 +00001935
1936if __name__ == "__main__":
1937 test_main()