blob: 4d27c2b3145236bdb9b4a66a74c943b2fc759dcc [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000025try:
26 import threading
27except ImportError:
28 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000029
Victor Stinner1aa54a42012-02-08 04:09:37 +010030os.stat_float_times(True)
31st = os.stat(__file__)
32stat_supports_subsecond = (
33 # check if float and int timestamps are different
34 (st.st_atime != st[7])
35 or (st.st_mtime != st[8])
36 or (st.st_ctime != st[9]))
37
Mark Dickinson7cf03892010-04-16 13:45:35 +000038# Detect whether we're on a Linux system that uses the (now outdated
39# and unmaintained) linuxthreads threading library. There's an issue
40# when combining linuxthreads with a failed execv call: see
41# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020042if hasattr(sys, 'thread_info') and sys.thread_info.version:
43 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
44else:
45 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000046
Thomas Wouters0e3f5912006-08-11 14:57:12 +000047# Tests creating TESTFN
48class FileTests(unittest.TestCase):
49 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000050 if os.path.exists(support.TESTFN):
51 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000052 tearDown = setUp
53
54 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000055 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000056 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000057 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000058
Christian Heimesfdab48e2008-01-20 09:06:41 +000059 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000060 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
61 # We must allocate two consecutive file descriptors, otherwise
62 # it will mess up other file descriptors (perhaps even the three
63 # standard ones).
64 second = os.dup(first)
65 try:
66 retries = 0
67 while second != first + 1:
68 os.close(first)
69 retries += 1
70 if retries > 10:
71 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000072 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000073 first, second = second, os.dup(second)
74 finally:
75 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000076 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000077 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000078 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000079
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000080 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000081 def test_rename(self):
82 path = support.TESTFN
83 old = sys.getrefcount(path)
84 self.assertRaises(TypeError, os.rename, path, 0)
85 new = sys.getrefcount(path)
86 self.assertEqual(old, new)
87
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000088 def test_read(self):
89 with open(support.TESTFN, "w+b") as fobj:
90 fobj.write(b"spam")
91 fobj.flush()
92 fd = fobj.fileno()
93 os.lseek(fd, 0, 0)
94 s = os.read(fd, 4)
95 self.assertEqual(type(s), bytes)
96 self.assertEqual(s, b"spam")
97
98 def test_write(self):
99 # os.write() accepts bytes- and buffer-like objects but not strings
100 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
101 self.assertRaises(TypeError, os.write, fd, "beans")
102 os.write(fd, b"bacon\n")
103 os.write(fd, bytearray(b"eggs\n"))
104 os.write(fd, memoryview(b"spam\n"))
105 os.close(fd)
106 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000107 self.assertEqual(fobj.read().splitlines(),
108 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000109
Victor Stinnere0daff12011-03-20 23:36:35 +0100110 def write_windows_console(self, *args):
111 retcode = subprocess.call(args,
112 # use a new console to not flood the test output
113 creationflags=subprocess.CREATE_NEW_CONSOLE,
114 # use a shell to hide the console window (SW_HIDE)
115 shell=True)
116 self.assertEqual(retcode, 0)
117
118 @unittest.skipUnless(sys.platform == 'win32',
119 'test specific to the Windows console')
120 def test_write_windows_console(self):
121 # Issue #11395: the Windows console returns an error (12: not enough
122 # space error) on writing into stdout if stdout mode is binary and the
123 # length is greater than 66,000 bytes (or less, depending on heap
124 # usage).
125 code = "print('x' * 100000)"
126 self.write_windows_console(sys.executable, "-c", code)
127 self.write_windows_console(sys.executable, "-u", "-c", code)
128
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000129 def fdopen_helper(self, *args):
130 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200131 f = os.fdopen(fd, *args)
132 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000133
134 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200135 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
136 os.close(fd)
137
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000138 self.fdopen_helper()
139 self.fdopen_helper('r')
140 self.fdopen_helper('r', 100)
141
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100142 def test_replace(self):
143 TESTFN2 = support.TESTFN + ".2"
144 with open(support.TESTFN, 'w') as f:
145 f.write("1")
146 with open(TESTFN2, 'w') as f:
147 f.write("2")
148 self.addCleanup(os.unlink, TESTFN2)
149 os.replace(support.TESTFN, TESTFN2)
150 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
151 with open(TESTFN2, 'r') as f:
152 self.assertEqual(f.read(), "1")
153
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200154
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000155# Test attributes on return values from os.*stat* family.
156class StatAttributeTests(unittest.TestCase):
157 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 os.mkdir(support.TESTFN)
159 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000160 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000161 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000162 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000163
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000164 def tearDown(self):
165 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000166 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167
Antoine Pitrou38425292010-09-21 18:19:07 +0000168 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169 if not hasattr(os, "stat"):
170 return
171
Antoine Pitrou38425292010-09-21 18:19:07 +0000172 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000173
174 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000175 self.assertEqual(result[stat.ST_SIZE], 3)
176 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000177
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000178 # Make sure all the attributes are there
179 members = dir(result)
180 for name in dir(stat):
181 if name[:3] == 'ST_':
182 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000183 if name.endswith("TIME"):
184 def trunc(x): return int(x)
185 else:
186 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000187 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000188 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000189 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000190
191 try:
192 result[200]
193 self.fail("No exception thrown")
194 except IndexError:
195 pass
196
197 # Make sure that assignment fails
198 try:
199 result.st_mode = 1
200 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000201 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000202 pass
203
204 try:
205 result.st_rdev = 1
206 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000207 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000208 pass
209
210 try:
211 result.parrot = 1
212 self.fail("No exception thrown")
213 except AttributeError:
214 pass
215
216 # Use the stat_result constructor with a too-short tuple.
217 try:
218 result2 = os.stat_result((10,))
219 self.fail("No exception thrown")
220 except TypeError:
221 pass
222
Ezio Melotti42da6632011-03-15 05:18:48 +0200223 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000224 try:
225 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
226 except TypeError:
227 pass
228
Antoine Pitrou38425292010-09-21 18:19:07 +0000229 def test_stat_attributes(self):
230 self.check_stat_attributes(self.fname)
231
232 def test_stat_attributes_bytes(self):
233 try:
234 fname = self.fname.encode(sys.getfilesystemencoding())
235 except UnicodeEncodeError:
236 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100237 with warnings.catch_warnings():
238 warnings.simplefilter("ignore", DeprecationWarning)
239 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000240
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241 def test_statvfs_attributes(self):
242 if not hasattr(os, "statvfs"):
243 return
244
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000245 try:
246 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000247 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000248 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000249 if e.errno == errno.ENOSYS:
250 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000251
252 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000253 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000255 # Make sure all the attributes are there.
256 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
257 'ffree', 'favail', 'flag', 'namemax')
258 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000259 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000260
261 # Make sure that assignment really fails
262 try:
263 result.f_bfree = 1
264 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000265 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000266 pass
267
268 try:
269 result.parrot = 1
270 self.fail("No exception thrown")
271 except AttributeError:
272 pass
273
274 # Use the constructor with a too-short tuple.
275 try:
276 result2 = os.statvfs_result((10,))
277 self.fail("No exception thrown")
278 except TypeError:
279 pass
280
Ezio Melotti42da6632011-03-15 05:18:48 +0200281 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282 try:
283 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
284 except TypeError:
285 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000286
Thomas Wouters89f507f2006-12-13 04:49:30 +0000287 def test_utime_dir(self):
288 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000289 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000290 # round to int, because some systems may support sub-second
291 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000292 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
293 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000294 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000295
Brian Curtin52fbea12011-11-06 13:41:17 -0600296 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600297 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600298 # second argument. Check that the previous methods of passing
299 # a time tuple or None work in addition to no argument.
300 st = os.stat(support.TESTFN)
301 # Doesn't set anything new, but sets the time tuple way
302 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
303 # Set to the current time in the old explicit way.
304 os.utime(support.TESTFN, None)
305 st1 = os.stat(support.TESTFN)
306 # Set to the current time in the new way
307 os.utime(support.TESTFN)
308 st2 = os.stat(support.TESTFN)
309 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
310
Victor Stinner1aa54a42012-02-08 04:09:37 +0100311 @unittest.skipUnless(stat_supports_subsecond,
312 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100313 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100314 asec, amsec = 1, 901
315 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100316 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100317 mtime = msec + mmsec * 1e-3
318 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100319 os.utime(filename, (0, 0))
320 set_time_func(filename, atime, mtime)
Victor Stinner1aa54a42012-02-08 04:09:37 +0100321 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100322 st = os.stat(filename)
323 self.assertAlmostEqual(st.st_atime, atime, places=3)
324 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100325
Victor Stinnera2f7c002012-02-08 03:36:25 +0100326 def test_utime_subsecond(self):
327 def set_time(filename, atime, mtime):
328 os.utime(filename, (atime, mtime))
329 self._test_utime_subsecond(set_time)
330
331 @unittest.skipUnless(hasattr(os, 'futimes'),
332 "os.futimes required for this test.")
333 def test_futimes_subsecond(self):
334 def set_time(filename, atime, mtime):
335 with open(filename, "wb") as f:
336 os.futimes(f.fileno(), (atime, mtime))
337 self._test_utime_subsecond(set_time)
338
339 @unittest.skipUnless(hasattr(os, 'futimens'),
340 "os.futimens required for this test.")
341 def test_futimens_subsecond(self):
342 def set_time(filename, atime, mtime):
343 with open(filename, "wb") as f:
344 asec, ansec = divmod(atime, 1.0)
345 asec = int(asec)
346 ansec = int(ansec * 1e9)
347 msec, mnsec = divmod(mtime, 1.0)
348 msec = int(msec)
349 mnsec = int(mnsec * 1e9)
350 os.futimens(f.fileno(),
351 (asec, ansec),
352 (msec, mnsec))
353 self._test_utime_subsecond(set_time)
354
355 @unittest.skipUnless(hasattr(os, 'futimesat'),
356 "os.futimesat required for this test.")
357 def test_futimesat_subsecond(self):
358 def set_time(filename, atime, mtime):
359 dirname = os.path.dirname(filename)
360 dirfd = os.open(dirname, os.O_RDONLY)
361 try:
362 os.futimesat(dirfd, os.path.basename(filename),
363 (atime, mtime))
364 finally:
365 os.close(dirfd)
366 self._test_utime_subsecond(set_time)
367
368 @unittest.skipUnless(hasattr(os, 'lutimes'),
369 "os.lutimes required for this test.")
370 def test_lutimes_subsecond(self):
371 def set_time(filename, atime, mtime):
372 os.lutimes(filename, (atime, mtime))
373 self._test_utime_subsecond(set_time)
374
375 @unittest.skipUnless(hasattr(os, 'utimensat'),
376 "os.utimensat required for this test.")
377 def test_utimensat_subsecond(self):
378 def set_time(filename, atime, mtime):
379 dirname = os.path.dirname(filename)
380 dirfd = os.open(dirname, os.O_RDONLY)
381 try:
382 asec, ansec = divmod(atime, 1.0)
383 asec = int(asec)
384 ansec = int(ansec * 1e9)
385 msec, mnsec = divmod(mtime, 1.0)
386 msec = int(msec)
387 mnsec = int(mnsec * 1e9)
388 os.utimensat(dirfd, os.path.basename(filename),
389 (asec, ansec),
390 (msec, mnsec))
391 finally:
392 os.close(dirfd)
393 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100394
Thomas Wouters89f507f2006-12-13 04:49:30 +0000395 # Restrict test to Win32, since there is no guarantee other
396 # systems support centiseconds
397 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000398 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000399 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000400 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000401 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000402 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000403 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000404 return buf.value
405
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000406 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000407 def test_1565150(self):
408 t1 = 1159195039.25
409 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000410 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000411
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000412 def test_large_time(self):
413 t1 = 5000000000 # some day in 2128
414 os.utime(self.fname, (t1, t1))
415 self.assertEqual(os.stat(self.fname).st_mtime, t1)
416
Guido van Rossumd8faa362007-04-27 19:54:29 +0000417 def test_1686475(self):
418 # Verify that an open file can be stat'ed
419 try:
420 os.stat(r"c:\pagefile.sys")
421 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000422 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000423 return
424 self.fail("Could not stat pagefile.sys")
425
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000426from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000427
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000428class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000429 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000430 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000431
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000432 def setUp(self):
433 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000434 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000435 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000436 for key, value in self._reference().items():
437 os.environ[key] = value
438
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000439 def tearDown(self):
440 os.environ.clear()
441 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000442 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000443 os.environb.clear()
444 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000445
Christian Heimes90333392007-11-01 19:08:42 +0000446 def _reference(self):
447 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
448
449 def _empty_mapping(self):
450 os.environ.clear()
451 return os.environ
452
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000453 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000454 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000455 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000456 if os.path.exists("/bin/sh"):
457 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000458 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
459 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000460 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000461
Christian Heimes1a13d592007-11-08 14:16:55 +0000462 def test_os_popen_iter(self):
463 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000464 with os.popen(
465 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
466 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000467 self.assertEqual(next(it), "line1\n")
468 self.assertEqual(next(it), "line2\n")
469 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000470 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000471
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000472 # Verify environ keys and values from the OS are of the
473 # correct str type.
474 def test_keyvalue_types(self):
475 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000476 self.assertEqual(type(key), str)
477 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000478
Christian Heimes90333392007-11-01 19:08:42 +0000479 def test_items(self):
480 for key, value in self._reference().items():
481 self.assertEqual(os.environ.get(key), value)
482
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000483 # Issue 7310
484 def test___repr__(self):
485 """Check that the repr() of os.environ looks like environ({...})."""
486 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000487 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
488 '{!r}: {!r}'.format(key, value)
489 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000490
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000491 def test_get_exec_path(self):
492 defpath_list = os.defpath.split(os.pathsep)
493 test_path = ['/monty', '/python', '', '/flying/circus']
494 test_env = {'PATH': os.pathsep.join(test_path)}
495
496 saved_environ = os.environ
497 try:
498 os.environ = dict(test_env)
499 # Test that defaulting to os.environ works.
500 self.assertSequenceEqual(test_path, os.get_exec_path())
501 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
502 finally:
503 os.environ = saved_environ
504
505 # No PATH environment variable
506 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
507 # Empty PATH environment variable
508 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
509 # Supplied PATH environment variable
510 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
511
Victor Stinnerb745a742010-05-18 17:17:23 +0000512 if os.supports_bytes_environ:
513 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000514 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000515 # ignore BytesWarning warning
516 with warnings.catch_warnings(record=True):
517 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000518 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000519 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000520 pass
521 else:
522 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000523
524 # bytes key and/or value
525 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
526 ['abc'])
527 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
528 ['abc'])
529 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
530 ['abc'])
531
532 @unittest.skipUnless(os.supports_bytes_environ,
533 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000534 def test_environb(self):
535 # os.environ -> os.environb
536 value = 'euro\u20ac'
537 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000538 value_bytes = value.encode(sys.getfilesystemencoding(),
539 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000540 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000541 msg = "U+20AC character is not encodable to %s" % (
542 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000543 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000544 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000545 self.assertEqual(os.environ['unicode'], value)
546 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000547
548 # os.environb -> os.environ
549 value = b'\xff'
550 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000551 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000552 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000553 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000554
Charles-François Natali2966f102011-11-26 11:32:46 +0100555 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
556 # #13415).
557 @support.requires_freebsd_version(7)
558 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100559 def test_unset_error(self):
560 if sys.platform == "win32":
561 # an environment variable is limited to 32,767 characters
562 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100563 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100564 else:
565 # "=" is not allowed in a variable name
566 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100567 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100568
Tim Petersc4e09402003-04-25 07:11:48 +0000569class WalkTests(unittest.TestCase):
570 """Tests for os.walk()."""
571
Charles-François Natali7372b062012-02-05 15:15:38 +0100572 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000573 import os
574 from os.path import join
575
576 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000577 # TESTFN/
578 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000579 # tmp1
580 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000581 # tmp2
582 # SUB11/ no kids
583 # SUB2/ a file kid and a dirsymlink kid
584 # tmp3
585 # link/ a symlink to TESTFN.2
586 # TEST2/
587 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000588 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000589 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000590 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000591 sub2_path = join(walk_path, "SUB2")
592 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000593 tmp2_path = join(sub1_path, "tmp2")
594 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000595 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000596 t2_path = join(support.TESTFN, "TEST2")
597 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000598
599 # Create stuff.
600 os.makedirs(sub11_path)
601 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000602 os.makedirs(t2_path)
603 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000604 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000605 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
606 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000607 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100608 if os.name == 'nt':
609 def symlink_to_dir(src, dest):
610 os.symlink(src, dest, True)
611 else:
612 symlink_to_dir = os.symlink
613 symlink_to_dir(os.path.abspath(t2_path), link_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000614 sub2_tree = (sub2_path, ["link"], ["tmp3"])
615 else:
616 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000617
618 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000619 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000620 self.assertEqual(len(all), 4)
621 # We can't know which order SUB1 and SUB2 will appear in.
622 # Not flipped: TESTFN, SUB1, SUB11, SUB2
623 # flipped: TESTFN, SUB2, SUB1, SUB11
624 flipped = all[0][1][0] != "SUB1"
625 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000626 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000627 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
628 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000629 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000630
631 # Prune the search.
632 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000633 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000634 all.append((root, dirs, files))
635 # Don't descend into SUB1.
636 if 'SUB1' in dirs:
637 # Note that this also mutates the dirs we appended to all!
638 dirs.remove('SUB1')
639 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000640 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
641 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000642
643 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000644 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000645 self.assertEqual(len(all), 4)
646 # We can't know which order SUB1 and SUB2 will appear in.
647 # Not flipped: SUB11, SUB1, SUB2, TESTFN
648 # flipped: SUB2, SUB11, SUB1, TESTFN
649 flipped = all[3][1][0] != "SUB1"
650 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000651 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000652 self.assertEqual(all[flipped], (sub11_path, [], []))
653 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000654 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000655
Brian Curtin3b4499c2010-12-28 14:31:47 +0000656 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000657 # Walk, following symlinks.
658 for root, dirs, files in os.walk(walk_path, followlinks=True):
659 if root == link_path:
660 self.assertEqual(dirs, [])
661 self.assertEqual(files, ["tmp4"])
662 break
663 else:
664 self.fail("Didn't follow symlink with followlinks=True")
665
666 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000667 # Tear everything down. This is a decent use for bottom-up on
668 # Windows, which doesn't have a recursive delete command. The
669 # (not so) subtlety is that rmdir will fail unless the dir's
670 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000671 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000672 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000673 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000674 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000675 dirname = os.path.join(root, name)
676 if not os.path.islink(dirname):
677 os.rmdir(dirname)
678 else:
679 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000680 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000681
Charles-François Natali7372b062012-02-05 15:15:38 +0100682
683@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
684class FwalkTests(WalkTests):
685 """Tests for os.fwalk()."""
686
687 def test_compare_to_walk(self):
688 # compare with walk() results
689 for topdown, followlinks in itertools.product((True, False), repeat=2):
690 args = support.TESTFN, topdown, None, followlinks
691 expected = {}
692 for root, dirs, files in os.walk(*args):
693 expected[root] = (set(dirs), set(files))
694
695 for root, dirs, files, rootfd in os.fwalk(*args):
696 self.assertIn(root, expected)
697 self.assertEqual(expected[root], (set(dirs), set(files)))
698
699 def test_dir_fd(self):
700 # check returned file descriptors
701 for topdown, followlinks in itertools.product((True, False), repeat=2):
702 args = support.TESTFN, topdown, None, followlinks
703 for root, dirs, files, rootfd in os.fwalk(*args):
704 # check that the FD is valid
705 os.fstat(rootfd)
Charles-François Natali77940902012-02-06 19:54:48 +0100706 # check that flistdir() returns consistent information
707 self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100708
709 def test_fd_leak(self):
710 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
711 # we both check that calling fwalk() a large number of times doesn't
712 # yield EMFILE, and that the minimum allocated FD hasn't changed.
713 minfd = os.dup(1)
714 os.close(minfd)
715 for i in range(256):
716 for x in os.fwalk(support.TESTFN):
717 pass
718 newfd = os.dup(1)
719 self.addCleanup(os.close, newfd)
720 self.assertEqual(newfd, minfd)
721
722 def tearDown(self):
723 # cleanup
724 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
725 for name in files:
726 os.unlinkat(rootfd, name)
727 for name in dirs:
728 st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW)
729 if stat.S_ISDIR(st.st_mode):
730 os.unlinkat(rootfd, name, os.AT_REMOVEDIR)
731 else:
732 os.unlinkat(rootfd, name)
733 os.rmdir(support.TESTFN)
734
735
Guido van Rossume7ba4952007-06-06 23:52:48 +0000736class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000737 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000738 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000739
740 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000741 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000742 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
743 os.makedirs(path) # Should work
744 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
745 os.makedirs(path)
746
747 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000748 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000749 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
750 os.makedirs(path)
751 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
752 'dir5', 'dir6')
753 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000754
Terry Reedy5a22b652010-12-02 07:05:56 +0000755 def test_exist_ok_existing_directory(self):
756 path = os.path.join(support.TESTFN, 'dir1')
757 mode = 0o777
758 old_mask = os.umask(0o022)
759 os.makedirs(path, mode)
760 self.assertRaises(OSError, os.makedirs, path, mode)
761 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
762 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
763 os.makedirs(path, mode=mode, exist_ok=True)
764 os.umask(old_mask)
765
766 def test_exist_ok_existing_regular_file(self):
767 base = support.TESTFN
768 path = os.path.join(support.TESTFN, 'dir1')
769 f = open(path, 'w')
770 f.write('abc')
771 f.close()
772 self.assertRaises(OSError, os.makedirs, path)
773 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
774 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
775 os.remove(path)
776
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000777 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000778 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000779 'dir4', 'dir5', 'dir6')
780 # If the tests failed, the bottom-most directory ('../dir6')
781 # may not have been created, so we look for the outermost directory
782 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000783 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000784 path = os.path.dirname(path)
785
786 os.removedirs(path)
787
Guido van Rossume7ba4952007-06-06 23:52:48 +0000788class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000789 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200790 with open(os.devnull, 'wb') as f:
791 f.write(b'hello')
792 f.close()
793 with open(os.devnull, 'rb') as f:
794 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000795
Guido van Rossume7ba4952007-06-06 23:52:48 +0000796class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000797 def test_urandom(self):
798 try:
799 self.assertEqual(len(os.urandom(1)), 1)
800 self.assertEqual(len(os.urandom(10)), 10)
801 self.assertEqual(len(os.urandom(100)), 100)
802 self.assertEqual(len(os.urandom(1000)), 1000)
803 except NotImplementedError:
804 pass
805
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000806@contextlib.contextmanager
807def _execvpe_mockup(defpath=None):
808 """
809 Stubs out execv and execve functions when used as context manager.
810 Records exec calls. The mock execv and execve functions always raise an
811 exception as they would normally never return.
812 """
813 # A list of tuples containing (function name, first arg, args)
814 # of calls to execv or execve that have been made.
815 calls = []
816
817 def mock_execv(name, *args):
818 calls.append(('execv', name, args))
819 raise RuntimeError("execv called")
820
821 def mock_execve(name, *args):
822 calls.append(('execve', name, args))
823 raise OSError(errno.ENOTDIR, "execve called")
824
825 try:
826 orig_execv = os.execv
827 orig_execve = os.execve
828 orig_defpath = os.defpath
829 os.execv = mock_execv
830 os.execve = mock_execve
831 if defpath is not None:
832 os.defpath = defpath
833 yield calls
834 finally:
835 os.execv = orig_execv
836 os.execve = orig_execve
837 os.defpath = orig_defpath
838
Guido van Rossume7ba4952007-06-06 23:52:48 +0000839class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000840 @unittest.skipIf(USING_LINUXTHREADS,
841 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000842 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000843 self.assertRaises(OSError, os.execvpe, 'no such app-',
844 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000845
Thomas Heller6790d602007-08-30 17:15:14 +0000846 def test_execvpe_with_bad_arglist(self):
847 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
848
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000849 @unittest.skipUnless(hasattr(os, '_execvpe'),
850 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000851 def _test_internal_execvpe(self, test_type):
852 program_path = os.sep + 'absolutepath'
853 if test_type is bytes:
854 program = b'executable'
855 fullpath = os.path.join(os.fsencode(program_path), program)
856 native_fullpath = fullpath
857 arguments = [b'progname', 'arg1', 'arg2']
858 else:
859 program = 'executable'
860 arguments = ['progname', 'arg1', 'arg2']
861 fullpath = os.path.join(program_path, program)
862 if os.name != "nt":
863 native_fullpath = os.fsencode(fullpath)
864 else:
865 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000866 env = {'spam': 'beans'}
867
Victor Stinnerb745a742010-05-18 17:17:23 +0000868 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000869 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000870 self.assertRaises(RuntimeError,
871 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000872 self.assertEqual(len(calls), 1)
873 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
874
Victor Stinnerb745a742010-05-18 17:17:23 +0000875 # test os._execvpe() with a relative path:
876 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000877 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000878 self.assertRaises(OSError,
879 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000880 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000881 self.assertSequenceEqual(calls[0],
882 ('execve', native_fullpath, (arguments, env)))
883
884 # test os._execvpe() with a relative path:
885 # os.get_exec_path() reads the 'PATH' variable
886 with _execvpe_mockup() as calls:
887 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000888 if test_type is bytes:
889 env_path[b'PATH'] = program_path
890 else:
891 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000892 self.assertRaises(OSError,
893 os._execvpe, program, arguments, env=env_path)
894 self.assertEqual(len(calls), 1)
895 self.assertSequenceEqual(calls[0],
896 ('execve', native_fullpath, (arguments, env_path)))
897
898 def test_internal_execvpe_str(self):
899 self._test_internal_execvpe(str)
900 if os.name != "nt":
901 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000902
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000903
Thomas Wouters477c8d52006-05-27 19:21:47 +0000904class Win32ErrorTests(unittest.TestCase):
905 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000906 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000907
908 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000909 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000910
911 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000912 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000913
914 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000915 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000916 try:
917 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
918 finally:
919 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000920 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000921
922 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000923 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000924
Thomas Wouters477c8d52006-05-27 19:21:47 +0000925 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000926 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000927
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000928class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000929 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000930 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
931 #singles.append("close")
932 #We omit close because it doesn'r raise an exception on some platforms
933 def get_single(f):
934 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000935 if hasattr(os, f):
936 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000937 return helper
938 for f in singles:
939 locals()["test_"+f] = get_single(f)
940
Benjamin Peterson7522c742009-01-19 21:00:09 +0000941 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000942 try:
943 f(support.make_bad_fd(), *args)
944 except OSError as e:
945 self.assertEqual(e.errno, errno.EBADF)
946 else:
947 self.fail("%r didn't raise a OSError with a bad file descriptor"
948 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000949
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000950 def test_isatty(self):
951 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000952 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000953
954 def test_closerange(self):
955 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000956 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000957 # Make sure none of the descriptors we are about to close are
958 # currently valid (issue 6542).
959 for i in range(10):
960 try: os.fstat(fd+i)
961 except OSError:
962 pass
963 else:
964 break
965 if i < 2:
966 raise unittest.SkipTest(
967 "Unable to acquire a range of invalid file descriptors")
968 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000969
970 def test_dup2(self):
971 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000972 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000973
974 def test_fchmod(self):
975 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000976 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000977
978 def test_fchown(self):
979 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000980 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000981
982 def test_fpathconf(self):
983 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000984 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000985
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000986 def test_ftruncate(self):
987 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000988 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000989
990 def test_lseek(self):
991 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000992 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000993
994 def test_read(self):
995 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000996 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000997
998 def test_tcsetpgrpt(self):
999 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001000 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001001
1002 def test_write(self):
1003 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001004 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001005
Brian Curtin1b9df392010-11-24 20:24:31 +00001006
1007class LinkTests(unittest.TestCase):
1008 def setUp(self):
1009 self.file1 = support.TESTFN
1010 self.file2 = os.path.join(support.TESTFN + "2")
1011
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001012 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001013 for file in (self.file1, self.file2):
1014 if os.path.exists(file):
1015 os.unlink(file)
1016
Brian Curtin1b9df392010-11-24 20:24:31 +00001017 def _test_link(self, file1, file2):
1018 with open(file1, "w") as f1:
1019 f1.write("test")
1020
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001021 with warnings.catch_warnings():
1022 warnings.simplefilter("ignore", DeprecationWarning)
1023 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001024 with open(file1, "r") as f1, open(file2, "r") as f2:
1025 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1026
1027 def test_link(self):
1028 self._test_link(self.file1, self.file2)
1029
1030 def test_link_bytes(self):
1031 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1032 bytes(self.file2, sys.getfilesystemencoding()))
1033
Brian Curtinf498b752010-11-30 15:54:04 +00001034 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001035 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001036 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001037 except UnicodeError:
1038 raise unittest.SkipTest("Unable to encode for this platform.")
1039
Brian Curtinf498b752010-11-30 15:54:04 +00001040 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001041 self.file2 = self.file1 + "2"
1042 self._test_link(self.file1, self.file2)
1043
Thomas Wouters477c8d52006-05-27 19:21:47 +00001044if sys.platform != 'win32':
1045 class Win32ErrorTests(unittest.TestCase):
1046 pass
1047
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001048 class PosixUidGidTests(unittest.TestCase):
1049 if hasattr(os, 'setuid'):
1050 def test_setuid(self):
1051 if os.getuid() != 0:
1052 self.assertRaises(os.error, os.setuid, 0)
1053 self.assertRaises(OverflowError, os.setuid, 1<<32)
1054
1055 if hasattr(os, 'setgid'):
1056 def test_setgid(self):
1057 if os.getuid() != 0:
1058 self.assertRaises(os.error, os.setgid, 0)
1059 self.assertRaises(OverflowError, os.setgid, 1<<32)
1060
1061 if hasattr(os, 'seteuid'):
1062 def test_seteuid(self):
1063 if os.getuid() != 0:
1064 self.assertRaises(os.error, os.seteuid, 0)
1065 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1066
1067 if hasattr(os, 'setegid'):
1068 def test_setegid(self):
1069 if os.getuid() != 0:
1070 self.assertRaises(os.error, os.setegid, 0)
1071 self.assertRaises(OverflowError, os.setegid, 1<<32)
1072
1073 if hasattr(os, 'setreuid'):
1074 def test_setreuid(self):
1075 if os.getuid() != 0:
1076 self.assertRaises(os.error, os.setreuid, 0, 0)
1077 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1078 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001079
1080 def test_setreuid_neg1(self):
1081 # Needs to accept -1. We run this in a subprocess to avoid
1082 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001083 subprocess.check_call([
1084 sys.executable, '-c',
1085 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001086
1087 if hasattr(os, 'setregid'):
1088 def test_setregid(self):
1089 if os.getuid() != 0:
1090 self.assertRaises(os.error, os.setregid, 0, 0)
1091 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1092 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001093
1094 def test_setregid_neg1(self):
1095 # Needs to accept -1. We run this in a subprocess to avoid
1096 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001097 subprocess.check_call([
1098 sys.executable, '-c',
1099 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001100
1101 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001102 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001103 if support.TESTFN_UNENCODABLE:
1104 self.dir = support.TESTFN_UNENCODABLE
1105 else:
1106 self.dir = support.TESTFN
1107 self.bdir = os.fsencode(self.dir)
1108
1109 bytesfn = []
1110 def add_filename(fn):
1111 try:
1112 fn = os.fsencode(fn)
1113 except UnicodeEncodeError:
1114 return
1115 bytesfn.append(fn)
1116 add_filename(support.TESTFN_UNICODE)
1117 if support.TESTFN_UNENCODABLE:
1118 add_filename(support.TESTFN_UNENCODABLE)
1119 if not bytesfn:
1120 self.skipTest("couldn't create any non-ascii filename")
1121
1122 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001123 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001124 try:
1125 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001126 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001127 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001128 if fn in self.unicodefn:
1129 raise ValueError("duplicate filename")
1130 self.unicodefn.add(fn)
1131 except:
1132 shutil.rmtree(self.dir)
1133 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001134
1135 def tearDown(self):
1136 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001137
1138 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001139 expected = self.unicodefn
1140 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001141 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001142
1143 def test_open(self):
1144 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001145 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001146 f.close()
1147
1148 def test_stat(self):
1149 for fn in self.unicodefn:
1150 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001151else:
1152 class PosixUidGidTests(unittest.TestCase):
1153 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001154 class Pep383Tests(unittest.TestCase):
1155 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001156
Brian Curtineb24d742010-04-12 17:16:38 +00001157@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1158class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001159 def _kill(self, sig):
1160 # Start sys.executable as a subprocess and communicate from the
1161 # subprocess to the parent that the interpreter is ready. When it
1162 # becomes ready, send *sig* via os.kill to the subprocess and check
1163 # that the return code is equal to *sig*.
1164 import ctypes
1165 from ctypes import wintypes
1166 import msvcrt
1167
1168 # Since we can't access the contents of the process' stdout until the
1169 # process has exited, use PeekNamedPipe to see what's inside stdout
1170 # without waiting. This is done so we can tell that the interpreter
1171 # is started and running at a point where it could handle a signal.
1172 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1173 PeekNamedPipe.restype = wintypes.BOOL
1174 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1175 ctypes.POINTER(ctypes.c_char), # stdout buf
1176 wintypes.DWORD, # Buffer size
1177 ctypes.POINTER(wintypes.DWORD), # bytes read
1178 ctypes.POINTER(wintypes.DWORD), # bytes avail
1179 ctypes.POINTER(wintypes.DWORD)) # bytes left
1180 msg = "running"
1181 proc = subprocess.Popen([sys.executable, "-c",
1182 "import sys;"
1183 "sys.stdout.write('{}');"
1184 "sys.stdout.flush();"
1185 "input()".format(msg)],
1186 stdout=subprocess.PIPE,
1187 stderr=subprocess.PIPE,
1188 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001189 self.addCleanup(proc.stdout.close)
1190 self.addCleanup(proc.stderr.close)
1191 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001192
1193 count, max = 0, 100
1194 while count < max and proc.poll() is None:
1195 # Create a string buffer to store the result of stdout from the pipe
1196 buf = ctypes.create_string_buffer(len(msg))
1197 # Obtain the text currently in proc.stdout
1198 # Bytes read/avail/left are left as NULL and unused
1199 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1200 buf, ctypes.sizeof(buf), None, None, None)
1201 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1202 if buf.value:
1203 self.assertEqual(msg, buf.value.decode())
1204 break
1205 time.sleep(0.1)
1206 count += 1
1207 else:
1208 self.fail("Did not receive communication from the subprocess")
1209
Brian Curtineb24d742010-04-12 17:16:38 +00001210 os.kill(proc.pid, sig)
1211 self.assertEqual(proc.wait(), sig)
1212
1213 def test_kill_sigterm(self):
1214 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001215 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001216
1217 def test_kill_int(self):
1218 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001219 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001220
1221 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001222 tagname = "test_os_%s" % uuid.uuid1()
1223 m = mmap.mmap(-1, 1, tagname)
1224 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001225 # Run a script which has console control handling enabled.
1226 proc = subprocess.Popen([sys.executable,
1227 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001228 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001229 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1230 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001231 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001232 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001233 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001234 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001235 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001236 count += 1
1237 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001238 # Forcefully kill the process if we weren't able to signal it.
1239 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001240 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001241 os.kill(proc.pid, event)
1242 # proc.send_signal(event) could also be done here.
1243 # Allow time for the signal to be passed and the process to exit.
1244 time.sleep(0.5)
1245 if not proc.poll():
1246 # Forcefully kill the process if we weren't able to signal it.
1247 os.kill(proc.pid, signal.SIGINT)
1248 self.fail("subprocess did not stop on {}".format(name))
1249
1250 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1251 def test_CTRL_C_EVENT(self):
1252 from ctypes import wintypes
1253 import ctypes
1254
1255 # Make a NULL value by creating a pointer with no argument.
1256 NULL = ctypes.POINTER(ctypes.c_int)()
1257 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1258 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1259 wintypes.BOOL)
1260 SetConsoleCtrlHandler.restype = wintypes.BOOL
1261
1262 # Calling this with NULL and FALSE causes the calling process to
1263 # handle CTRL+C, rather than ignore it. This property is inherited
1264 # by subprocesses.
1265 SetConsoleCtrlHandler(NULL, 0)
1266
1267 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1268
1269 def test_CTRL_BREAK_EVENT(self):
1270 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1271
1272
Brian Curtind40e6f72010-07-08 21:39:08 +00001273@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001274@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001275class Win32SymlinkTests(unittest.TestCase):
1276 filelink = 'filelinktest'
1277 filelink_target = os.path.abspath(__file__)
1278 dirlink = 'dirlinktest'
1279 dirlink_target = os.path.dirname(filelink_target)
1280 missing_link = 'missing link'
1281
1282 def setUp(self):
1283 assert os.path.exists(self.dirlink_target)
1284 assert os.path.exists(self.filelink_target)
1285 assert not os.path.exists(self.dirlink)
1286 assert not os.path.exists(self.filelink)
1287 assert not os.path.exists(self.missing_link)
1288
1289 def tearDown(self):
1290 if os.path.exists(self.filelink):
1291 os.remove(self.filelink)
1292 if os.path.exists(self.dirlink):
1293 os.rmdir(self.dirlink)
1294 if os.path.lexists(self.missing_link):
1295 os.remove(self.missing_link)
1296
1297 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001298 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001299 self.assertTrue(os.path.exists(self.dirlink))
1300 self.assertTrue(os.path.isdir(self.dirlink))
1301 self.assertTrue(os.path.islink(self.dirlink))
1302 self.check_stat(self.dirlink, self.dirlink_target)
1303
1304 def test_file_link(self):
1305 os.symlink(self.filelink_target, self.filelink)
1306 self.assertTrue(os.path.exists(self.filelink))
1307 self.assertTrue(os.path.isfile(self.filelink))
1308 self.assertTrue(os.path.islink(self.filelink))
1309 self.check_stat(self.filelink, self.filelink_target)
1310
1311 def _create_missing_dir_link(self):
1312 'Create a "directory" link to a non-existent target'
1313 linkname = self.missing_link
1314 if os.path.lexists(linkname):
1315 os.remove(linkname)
1316 target = r'c:\\target does not exist.29r3c740'
1317 assert not os.path.exists(target)
1318 target_is_dir = True
1319 os.symlink(target, linkname, target_is_dir)
1320
1321 def test_remove_directory_link_to_missing_target(self):
1322 self._create_missing_dir_link()
1323 # For compatibility with Unix, os.remove will check the
1324 # directory status and call RemoveDirectory if the symlink
1325 # was created with target_is_dir==True.
1326 os.remove(self.missing_link)
1327
1328 @unittest.skip("currently fails; consider for improvement")
1329 def test_isdir_on_directory_link_to_missing_target(self):
1330 self._create_missing_dir_link()
1331 # consider having isdir return true for directory links
1332 self.assertTrue(os.path.isdir(self.missing_link))
1333
1334 @unittest.skip("currently fails; consider for improvement")
1335 def test_rmdir_on_directory_link_to_missing_target(self):
1336 self._create_missing_dir_link()
1337 # consider allowing rmdir to remove directory links
1338 os.rmdir(self.missing_link)
1339
1340 def check_stat(self, link, target):
1341 self.assertEqual(os.stat(link), os.stat(target))
1342 self.assertNotEqual(os.lstat(link), os.stat(link))
1343
Brian Curtind25aef52011-06-13 15:16:04 -05001344 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001345 with warnings.catch_warnings():
1346 warnings.simplefilter("ignore", DeprecationWarning)
1347 self.assertEqual(os.stat(bytes_link), os.stat(target))
1348 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001349
1350 def test_12084(self):
1351 level1 = os.path.abspath(support.TESTFN)
1352 level2 = os.path.join(level1, "level2")
1353 level3 = os.path.join(level2, "level3")
1354 try:
1355 os.mkdir(level1)
1356 os.mkdir(level2)
1357 os.mkdir(level3)
1358
1359 file1 = os.path.abspath(os.path.join(level1, "file1"))
1360
1361 with open(file1, "w") as f:
1362 f.write("file1")
1363
1364 orig_dir = os.getcwd()
1365 try:
1366 os.chdir(level2)
1367 link = os.path.join(level2, "link")
1368 os.symlink(os.path.relpath(file1), "link")
1369 self.assertIn("link", os.listdir(os.getcwd()))
1370
1371 # Check os.stat calls from the same dir as the link
1372 self.assertEqual(os.stat(file1), os.stat("link"))
1373
1374 # Check os.stat calls from a dir below the link
1375 os.chdir(level1)
1376 self.assertEqual(os.stat(file1),
1377 os.stat(os.path.relpath(link)))
1378
1379 # Check os.stat calls from a dir above the link
1380 os.chdir(level3)
1381 self.assertEqual(os.stat(file1),
1382 os.stat(os.path.relpath(link)))
1383 finally:
1384 os.chdir(orig_dir)
1385 except OSError as err:
1386 self.fail(err)
1387 finally:
1388 os.remove(file1)
1389 shutil.rmtree(level1)
1390
Brian Curtind40e6f72010-07-08 21:39:08 +00001391
Victor Stinnere8d51452010-08-19 01:05:19 +00001392class FSEncodingTests(unittest.TestCase):
1393 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001394 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1395 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001396
Victor Stinnere8d51452010-08-19 01:05:19 +00001397 def test_identity(self):
1398 # assert fsdecode(fsencode(x)) == x
1399 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1400 try:
1401 bytesfn = os.fsencode(fn)
1402 except UnicodeEncodeError:
1403 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001404 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001405
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001406
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001407class PidTests(unittest.TestCase):
1408 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1409 def test_getppid(self):
1410 p = subprocess.Popen([sys.executable, '-c',
1411 'import os; print(os.getppid())'],
1412 stdout=subprocess.PIPE)
1413 stdout, _ = p.communicate()
1414 # We are the parent of our subprocess
1415 self.assertEqual(int(stdout), os.getpid())
1416
1417
Brian Curtin0151b8e2010-09-24 13:43:43 +00001418# The introduction of this TestCase caused at least two different errors on
1419# *nix buildbots. Temporarily skip this to let the buildbots move along.
1420@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001421@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1422class LoginTests(unittest.TestCase):
1423 def test_getlogin(self):
1424 user_name = os.getlogin()
1425 self.assertNotEqual(len(user_name), 0)
1426
1427
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001428@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1429 "needs os.getpriority and os.setpriority")
1430class ProgramPriorityTests(unittest.TestCase):
1431 """Tests for os.getpriority() and os.setpriority()."""
1432
1433 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001434
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001435 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1436 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1437 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001438 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1439 if base >= 19 and new_prio <= 19:
1440 raise unittest.SkipTest(
1441 "unable to reliably test setpriority at current nice level of %s" % base)
1442 else:
1443 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001444 finally:
1445 try:
1446 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1447 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001448 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001449 raise
1450
1451
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001452if threading is not None:
1453 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001454
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001455 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001456
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001457 def __init__(self, conn):
1458 asynchat.async_chat.__init__(self, conn)
1459 self.in_buffer = []
1460 self.closed = False
1461 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001462
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001463 def handle_read(self):
1464 data = self.recv(4096)
1465 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001466
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001467 def get_data(self):
1468 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001469
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001470 def handle_close(self):
1471 self.close()
1472 self.closed = True
1473
1474 def handle_error(self):
1475 raise
1476
1477 def __init__(self, address):
1478 threading.Thread.__init__(self)
1479 asyncore.dispatcher.__init__(self)
1480 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1481 self.bind(address)
1482 self.listen(5)
1483 self.host, self.port = self.socket.getsockname()[:2]
1484 self.handler_instance = None
1485 self._active = False
1486 self._active_lock = threading.Lock()
1487
1488 # --- public API
1489
1490 @property
1491 def running(self):
1492 return self._active
1493
1494 def start(self):
1495 assert not self.running
1496 self.__flag = threading.Event()
1497 threading.Thread.start(self)
1498 self.__flag.wait()
1499
1500 def stop(self):
1501 assert self.running
1502 self._active = False
1503 self.join()
1504
1505 def wait(self):
1506 # wait for handler connection to be closed, then stop the server
1507 while not getattr(self.handler_instance, "closed", False):
1508 time.sleep(0.001)
1509 self.stop()
1510
1511 # --- internals
1512
1513 def run(self):
1514 self._active = True
1515 self.__flag.set()
1516 while self._active and asyncore.socket_map:
1517 self._active_lock.acquire()
1518 asyncore.loop(timeout=0.001, count=1)
1519 self._active_lock.release()
1520 asyncore.close_all()
1521
1522 def handle_accept(self):
1523 conn, addr = self.accept()
1524 self.handler_instance = self.Handler(conn)
1525
1526 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001527 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001528 handle_read = handle_connect
1529
1530 def writable(self):
1531 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001532
1533 def handle_error(self):
1534 raise
1535
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001536
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001537@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001538@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1539class TestSendfile(unittest.TestCase):
1540
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001541 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001542 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001543 not sys.platform.startswith("solaris") and \
1544 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001545
1546 @classmethod
1547 def setUpClass(cls):
1548 with open(support.TESTFN, "wb") as f:
1549 f.write(cls.DATA)
1550
1551 @classmethod
1552 def tearDownClass(cls):
1553 support.unlink(support.TESTFN)
1554
1555 def setUp(self):
1556 self.server = SendfileTestServer((support.HOST, 0))
1557 self.server.start()
1558 self.client = socket.socket()
1559 self.client.connect((self.server.host, self.server.port))
1560 self.client.settimeout(1)
1561 # synchronize by waiting for "220 ready" response
1562 self.client.recv(1024)
1563 self.sockno = self.client.fileno()
1564 self.file = open(support.TESTFN, 'rb')
1565 self.fileno = self.file.fileno()
1566
1567 def tearDown(self):
1568 self.file.close()
1569 self.client.close()
1570 if self.server.running:
1571 self.server.stop()
1572
1573 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1574 """A higher level wrapper representing how an application is
1575 supposed to use sendfile().
1576 """
1577 while 1:
1578 try:
1579 if self.SUPPORT_HEADERS_TRAILERS:
1580 return os.sendfile(sock, file, offset, nbytes, headers,
1581 trailers)
1582 else:
1583 return os.sendfile(sock, file, offset, nbytes)
1584 except OSError as err:
1585 if err.errno == errno.ECONNRESET:
1586 # disconnected
1587 raise
1588 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1589 # we have to retry send data
1590 continue
1591 else:
1592 raise
1593
1594 def test_send_whole_file(self):
1595 # normal send
1596 total_sent = 0
1597 offset = 0
1598 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001599 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001600 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1601 if sent == 0:
1602 break
1603 offset += sent
1604 total_sent += sent
1605 self.assertTrue(sent <= nbytes)
1606 self.assertEqual(offset, total_sent)
1607
1608 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001609 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001610 self.client.close()
1611 self.server.wait()
1612 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001613 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001614 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001615
1616 def test_send_at_certain_offset(self):
1617 # start sending a file at a certain offset
1618 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001619 offset = len(self.DATA) // 2
1620 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001621 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001622 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001623 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1624 if sent == 0:
1625 break
1626 offset += sent
1627 total_sent += sent
1628 self.assertTrue(sent <= nbytes)
1629
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001630 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001631 self.client.close()
1632 self.server.wait()
1633 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001634 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001635 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001636 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001637 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001638
1639 def test_offset_overflow(self):
1640 # specify an offset > file size
1641 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001642 try:
1643 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1644 except OSError as e:
1645 # Solaris can raise EINVAL if offset >= file length, ignore.
1646 if e.errno != errno.EINVAL:
1647 raise
1648 else:
1649 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001650 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001651 self.client.close()
1652 self.server.wait()
1653 data = self.server.handler_instance.get_data()
1654 self.assertEqual(data, b'')
1655
1656 def test_invalid_offset(self):
1657 with self.assertRaises(OSError) as cm:
1658 os.sendfile(self.sockno, self.fileno, -1, 4096)
1659 self.assertEqual(cm.exception.errno, errno.EINVAL)
1660
1661 # --- headers / trailers tests
1662
1663 if SUPPORT_HEADERS_TRAILERS:
1664
1665 def test_headers(self):
1666 total_sent = 0
1667 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1668 headers=[b"x" * 512])
1669 total_sent += sent
1670 offset = 4096
1671 nbytes = 4096
1672 while 1:
1673 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1674 offset, nbytes)
1675 if sent == 0:
1676 break
1677 total_sent += sent
1678 offset += sent
1679
1680 expected_data = b"x" * 512 + self.DATA
1681 self.assertEqual(total_sent, len(expected_data))
1682 self.client.close()
1683 self.server.wait()
1684 data = self.server.handler_instance.get_data()
1685 self.assertEqual(hash(data), hash(expected_data))
1686
1687 def test_trailers(self):
1688 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001689 with open(TESTFN2, 'wb') as f:
1690 f.write(b"abcde")
1691 with open(TESTFN2, 'rb')as f:
1692 self.addCleanup(os.remove, TESTFN2)
1693 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1694 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001695 self.client.close()
1696 self.server.wait()
1697 data = self.server.handler_instance.get_data()
1698 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001699
1700 if hasattr(os, "SF_NODISKIO"):
1701 def test_flags(self):
1702 try:
1703 os.sendfile(self.sockno, self.fileno, 0, 4096,
1704 flags=os.SF_NODISKIO)
1705 except OSError as err:
1706 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1707 raise
1708
1709
Benjamin Peterson799bd802011-08-31 22:15:17 -04001710def supports_extended_attributes():
1711 if not hasattr(os, "setxattr"):
1712 return False
1713 try:
1714 with open(support.TESTFN, "wb") as fp:
1715 try:
1716 os.fsetxattr(fp.fileno(), b"user.test", b"")
1717 except OSError as e:
1718 if e.errno != errno.ENOTSUP:
1719 raise
1720 return False
1721 finally:
1722 support.unlink(support.TESTFN)
1723 # Kernels < 2.6.39 don't respect setxattr flags.
1724 kernel_version = platform.release()
1725 m = re.match("2.6.(\d{1,2})", kernel_version)
1726 return m is None or int(m.group(1)) >= 39
1727
1728
1729@unittest.skipUnless(supports_extended_attributes(),
1730 "no non-broken extended attribute support")
1731class ExtendedAttributeTests(unittest.TestCase):
1732
1733 def tearDown(self):
1734 support.unlink(support.TESTFN)
1735
1736 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1737 fn = support.TESTFN
1738 open(fn, "wb").close()
1739 with self.assertRaises(OSError) as cm:
1740 getxattr(fn, s("user.test"))
1741 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001742 init_xattr = listxattr(fn)
1743 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001744 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001745 xattr = set(init_xattr)
1746 xattr.add("user.test")
1747 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001748 self.assertEqual(getxattr(fn, b"user.test"), b"")
1749 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1750 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1751 with self.assertRaises(OSError) as cm:
1752 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1753 self.assertEqual(cm.exception.errno, errno.EEXIST)
1754 with self.assertRaises(OSError) as cm:
1755 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1756 self.assertEqual(cm.exception.errno, errno.ENODATA)
1757 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001758 xattr.add("user.test2")
1759 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001760 removexattr(fn, s("user.test"))
1761 with self.assertRaises(OSError) as cm:
1762 getxattr(fn, s("user.test"))
1763 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001764 xattr.remove("user.test")
1765 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001766 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1767 setxattr(fn, s("user.test"), b"a"*1024)
1768 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1769 removexattr(fn, s("user.test"))
1770 many = sorted("user.test{}".format(i) for i in range(100))
1771 for thing in many:
1772 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001773 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001774
1775 def _check_xattrs(self, *args):
1776 def make_bytes(s):
1777 return bytes(s, "ascii")
1778 self._check_xattrs_str(str, *args)
1779 support.unlink(support.TESTFN)
1780 self._check_xattrs_str(make_bytes, *args)
1781
1782 def test_simple(self):
1783 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1784 os.listxattr)
1785
1786 def test_lpath(self):
1787 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1788 os.llistxattr)
1789
1790 def test_fds(self):
1791 def getxattr(path, *args):
1792 with open(path, "rb") as fp:
1793 return os.fgetxattr(fp.fileno(), *args)
1794 def setxattr(path, *args):
1795 with open(path, "wb") as fp:
1796 os.fsetxattr(fp.fileno(), *args)
1797 def removexattr(path, *args):
1798 with open(path, "wb") as fp:
1799 os.fremovexattr(fp.fileno(), *args)
1800 def listxattr(path, *args):
1801 with open(path, "rb") as fp:
1802 return os.flistxattr(fp.fileno(), *args)
1803 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1804
1805
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001806@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1807class Win32DeprecatedBytesAPI(unittest.TestCase):
1808 def test_deprecated(self):
1809 import nt
1810 filename = os.fsencode(support.TESTFN)
1811 with warnings.catch_warnings():
1812 warnings.simplefilter("error", DeprecationWarning)
1813 for func, *args in (
1814 (nt._getfullpathname, filename),
1815 (nt._isdir, filename),
1816 (os.access, filename, os.R_OK),
1817 (os.chdir, filename),
1818 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001819 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001820 (os.link, filename, filename),
1821 (os.listdir, filename),
1822 (os.lstat, filename),
1823 (os.mkdir, filename),
1824 (os.open, filename, os.O_RDONLY),
1825 (os.rename, filename, filename),
1826 (os.rmdir, filename),
1827 (os.startfile, filename),
1828 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001829 (os.unlink, filename),
1830 (os.utime, filename),
1831 ):
1832 self.assertRaises(DeprecationWarning, func, *args)
1833
Victor Stinner28216442011-11-16 00:34:44 +01001834 @support.skip_unless_symlink
1835 def test_symlink(self):
1836 filename = os.fsencode(support.TESTFN)
1837 with warnings.catch_warnings():
1838 warnings.simplefilter("error", DeprecationWarning)
1839 self.assertRaises(DeprecationWarning,
1840 os.symlink, filename, filename)
1841
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001842
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001843@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001844def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001845 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001846 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001847 StatAttributeTests,
1848 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001849 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01001850 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001851 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001852 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001853 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001854 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001855 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001856 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001857 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001858 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001859 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001860 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001861 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001862 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001863 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001864 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001865 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001866 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001867 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001868 Win32DeprecatedBytesAPI,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001869 )
Fred Drake2e2be372001-09-20 21:33:42 +00001870
1871if __name__ == "__main__":
1872 test_main()