blob: e44174c126e8f757c6852ea5e419821e986d5333 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000025try:
26 import threading
27except ImportError:
28 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000029
Mark Dickinson7cf03892010-04-16 13:45:35 +000030# Detect whether we're on a Linux system that uses the (now outdated
31# and unmaintained) linuxthreads threading library. There's an issue
32# when combining linuxthreads with a failed execv call: see
33# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020034if hasattr(sys, 'thread_info') and sys.thread_info.version:
35 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
36else:
37 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000038
Thomas Wouters0e3f5912006-08-11 14:57:12 +000039# Tests creating TESTFN
40class FileTests(unittest.TestCase):
41 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000042 if os.path.exists(support.TESTFN):
43 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000044 tearDown = setUp
45
46 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000047 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000049 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000050
Christian Heimesfdab48e2008-01-20 09:06:41 +000051 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000052 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
53 # We must allocate two consecutive file descriptors, otherwise
54 # it will mess up other file descriptors (perhaps even the three
55 # standard ones).
56 second = os.dup(first)
57 try:
58 retries = 0
59 while second != first + 1:
60 os.close(first)
61 retries += 1
62 if retries > 10:
63 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000064 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000065 first, second = second, os.dup(second)
66 finally:
67 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000068 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000069 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000070 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000071
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000072 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000073 def test_rename(self):
74 path = support.TESTFN
75 old = sys.getrefcount(path)
76 self.assertRaises(TypeError, os.rename, path, 0)
77 new = sys.getrefcount(path)
78 self.assertEqual(old, new)
79
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000080 def test_read(self):
81 with open(support.TESTFN, "w+b") as fobj:
82 fobj.write(b"spam")
83 fobj.flush()
84 fd = fobj.fileno()
85 os.lseek(fd, 0, 0)
86 s = os.read(fd, 4)
87 self.assertEqual(type(s), bytes)
88 self.assertEqual(s, b"spam")
89
90 def test_write(self):
91 # os.write() accepts bytes- and buffer-like objects but not strings
92 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
93 self.assertRaises(TypeError, os.write, fd, "beans")
94 os.write(fd, b"bacon\n")
95 os.write(fd, bytearray(b"eggs\n"))
96 os.write(fd, memoryview(b"spam\n"))
97 os.close(fd)
98 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000099 self.assertEqual(fobj.read().splitlines(),
100 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000101
Victor Stinnere0daff12011-03-20 23:36:35 +0100102 def write_windows_console(self, *args):
103 retcode = subprocess.call(args,
104 # use a new console to not flood the test output
105 creationflags=subprocess.CREATE_NEW_CONSOLE,
106 # use a shell to hide the console window (SW_HIDE)
107 shell=True)
108 self.assertEqual(retcode, 0)
109
110 @unittest.skipUnless(sys.platform == 'win32',
111 'test specific to the Windows console')
112 def test_write_windows_console(self):
113 # Issue #11395: the Windows console returns an error (12: not enough
114 # space error) on writing into stdout if stdout mode is binary and the
115 # length is greater than 66,000 bytes (or less, depending on heap
116 # usage).
117 code = "print('x' * 100000)"
118 self.write_windows_console(sys.executable, "-c", code)
119 self.write_windows_console(sys.executable, "-u", "-c", code)
120
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000121 def fdopen_helper(self, *args):
122 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200123 f = os.fdopen(fd, *args)
124 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000125
126 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200127 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
128 os.close(fd)
129
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000130 self.fdopen_helper()
131 self.fdopen_helper('r')
132 self.fdopen_helper('r', 100)
133
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100134 def test_replace(self):
135 TESTFN2 = support.TESTFN + ".2"
136 with open(support.TESTFN, 'w') as f:
137 f.write("1")
138 with open(TESTFN2, 'w') as f:
139 f.write("2")
140 self.addCleanup(os.unlink, TESTFN2)
141 os.replace(support.TESTFN, TESTFN2)
142 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
143 with open(TESTFN2, 'r') as f:
144 self.assertEqual(f.read(), "1")
145
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200146
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000147# Test attributes on return values from os.*stat* family.
148class StatAttributeTests(unittest.TestCase):
149 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000150 os.mkdir(support.TESTFN)
151 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000152 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000153 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000154 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000155
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000156 def tearDown(self):
157 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000159
Antoine Pitrou38425292010-09-21 18:19:07 +0000160 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000161 if not hasattr(os, "stat"):
162 return
163
Antoine Pitrou38425292010-09-21 18:19:07 +0000164 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165
166 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000167 self.assertEqual(result[stat.ST_SIZE], 3)
168 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000170 # Make sure all the attributes are there
171 members = dir(result)
172 for name in dir(stat):
173 if name[:3] == 'ST_':
174 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000175 if name.endswith("TIME"):
176 def trunc(x): return int(x)
177 else:
178 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000179 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000181 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000182
183 try:
184 result[200]
185 self.fail("No exception thrown")
186 except IndexError:
187 pass
188
189 # Make sure that assignment fails
190 try:
191 result.st_mode = 1
192 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000193 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000194 pass
195
196 try:
197 result.st_rdev = 1
198 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000199 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000200 pass
201
202 try:
203 result.parrot = 1
204 self.fail("No exception thrown")
205 except AttributeError:
206 pass
207
208 # Use the stat_result constructor with a too-short tuple.
209 try:
210 result2 = os.stat_result((10,))
211 self.fail("No exception thrown")
212 except TypeError:
213 pass
214
Ezio Melotti42da6632011-03-15 05:18:48 +0200215 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000216 try:
217 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
218 except TypeError:
219 pass
220
Antoine Pitrou38425292010-09-21 18:19:07 +0000221 def test_stat_attributes(self):
222 self.check_stat_attributes(self.fname)
223
224 def test_stat_attributes_bytes(self):
225 try:
226 fname = self.fname.encode(sys.getfilesystemencoding())
227 except UnicodeEncodeError:
228 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100229 with warnings.catch_warnings():
230 warnings.simplefilter("ignore", DeprecationWarning)
231 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000232
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233 def test_statvfs_attributes(self):
234 if not hasattr(os, "statvfs"):
235 return
236
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000237 try:
238 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000239 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000240 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000241 if e.errno == errno.ENOSYS:
242 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000243
244 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000247 # Make sure all the attributes are there.
248 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
249 'ffree', 'favail', 'flag', 'namemax')
250 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000252
253 # Make sure that assignment really fails
254 try:
255 result.f_bfree = 1
256 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000257 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258 pass
259
260 try:
261 result.parrot = 1
262 self.fail("No exception thrown")
263 except AttributeError:
264 pass
265
266 # Use the constructor with a too-short tuple.
267 try:
268 result2 = os.statvfs_result((10,))
269 self.fail("No exception thrown")
270 except TypeError:
271 pass
272
Ezio Melotti42da6632011-03-15 05:18:48 +0200273 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000274 try:
275 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
276 except TypeError:
277 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000278
Thomas Wouters89f507f2006-12-13 04:49:30 +0000279 def test_utime_dir(self):
280 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000281 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000282 # round to int, because some systems may support sub-second
283 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000284 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
285 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000286 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000287
Brian Curtin52fbea12011-11-06 13:41:17 -0600288 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600289 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600290 # second argument. Check that the previous methods of passing
291 # a time tuple or None work in addition to no argument.
292 st = os.stat(support.TESTFN)
293 # Doesn't set anything new, but sets the time tuple way
294 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
295 # Set to the current time in the old explicit way.
296 os.utime(support.TESTFN, None)
297 st1 = os.stat(support.TESTFN)
298 # Set to the current time in the new way
299 os.utime(support.TESTFN)
300 st2 = os.stat(support.TESTFN)
301 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
302
Victor Stinnera2f7c002012-02-08 03:36:25 +0100303 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100304 asec, amsec = 1, 901
305 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100306 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100307 mtime = msec + mmsec * 1e-3
308 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100309 os.utime(filename, (0, 0))
310 set_time_func(filename, atime, mtime)
311 st = os.stat(filename)
312 self.assertAlmostEqual(st.st_atime, atime, places=3)
313 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100314
Victor Stinnera2f7c002012-02-08 03:36:25 +0100315 def test_utime_subsecond(self):
316 def set_time(filename, atime, mtime):
317 os.utime(filename, (atime, mtime))
318 self._test_utime_subsecond(set_time)
319
320 @unittest.skipUnless(hasattr(os, 'futimes'),
321 "os.futimes required for this test.")
322 def test_futimes_subsecond(self):
323 def set_time(filename, atime, mtime):
324 with open(filename, "wb") as f:
325 os.futimes(f.fileno(), (atime, mtime))
326 self._test_utime_subsecond(set_time)
327
328 @unittest.skipUnless(hasattr(os, 'futimens'),
329 "os.futimens required for this test.")
330 def test_futimens_subsecond(self):
331 def set_time(filename, atime, mtime):
332 with open(filename, "wb") as f:
333 asec, ansec = divmod(atime, 1.0)
334 asec = int(asec)
335 ansec = int(ansec * 1e9)
336 msec, mnsec = divmod(mtime, 1.0)
337 msec = int(msec)
338 mnsec = int(mnsec * 1e9)
339 os.futimens(f.fileno(),
340 (asec, ansec),
341 (msec, mnsec))
342 self._test_utime_subsecond(set_time)
343
344 @unittest.skipUnless(hasattr(os, 'futimesat'),
345 "os.futimesat required for this test.")
346 def test_futimesat_subsecond(self):
347 def set_time(filename, atime, mtime):
348 dirname = os.path.dirname(filename)
349 dirfd = os.open(dirname, os.O_RDONLY)
350 try:
351 os.futimesat(dirfd, os.path.basename(filename),
352 (atime, mtime))
353 finally:
354 os.close(dirfd)
355 self._test_utime_subsecond(set_time)
356
357 @unittest.skipUnless(hasattr(os, 'lutimes'),
358 "os.lutimes required for this test.")
359 def test_lutimes_subsecond(self):
360 def set_time(filename, atime, mtime):
361 os.lutimes(filename, (atime, mtime))
362 self._test_utime_subsecond(set_time)
363
364 @unittest.skipUnless(hasattr(os, 'utimensat'),
365 "os.utimensat required for this test.")
366 def test_utimensat_subsecond(self):
367 def set_time(filename, atime, mtime):
368 dirname = os.path.dirname(filename)
369 dirfd = os.open(dirname, os.O_RDONLY)
370 try:
371 asec, ansec = divmod(atime, 1.0)
372 asec = int(asec)
373 ansec = int(ansec * 1e9)
374 msec, mnsec = divmod(mtime, 1.0)
375 msec = int(msec)
376 mnsec = int(mnsec * 1e9)
377 os.utimensat(dirfd, os.path.basename(filename),
378 (asec, ansec),
379 (msec, mnsec))
380 finally:
381 os.close(dirfd)
382 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100383
Thomas Wouters89f507f2006-12-13 04:49:30 +0000384 # Restrict test to Win32, since there is no guarantee other
385 # systems support centiseconds
386 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000387 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000388 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000389 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000390 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000391 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000392 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000393 return buf.value
394
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000395 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000396 def test_1565150(self):
397 t1 = 1159195039.25
398 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000399 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000400
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000401 def test_large_time(self):
402 t1 = 5000000000 # some day in 2128
403 os.utime(self.fname, (t1, t1))
404 self.assertEqual(os.stat(self.fname).st_mtime, t1)
405
Guido van Rossumd8faa362007-04-27 19:54:29 +0000406 def test_1686475(self):
407 # Verify that an open file can be stat'ed
408 try:
409 os.stat(r"c:\pagefile.sys")
410 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000411 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000412 return
413 self.fail("Could not stat pagefile.sys")
414
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000415from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000416
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000417class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000418 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000419 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000420
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000421 def setUp(self):
422 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000423 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000424 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000425 for key, value in self._reference().items():
426 os.environ[key] = value
427
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000428 def tearDown(self):
429 os.environ.clear()
430 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000431 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000432 os.environb.clear()
433 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000434
Christian Heimes90333392007-11-01 19:08:42 +0000435 def _reference(self):
436 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
437
438 def _empty_mapping(self):
439 os.environ.clear()
440 return os.environ
441
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000442 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000443 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000444 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000445 if os.path.exists("/bin/sh"):
446 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000447 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
448 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000449 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000450
Christian Heimes1a13d592007-11-08 14:16:55 +0000451 def test_os_popen_iter(self):
452 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000453 with os.popen(
454 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
455 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000456 self.assertEqual(next(it), "line1\n")
457 self.assertEqual(next(it), "line2\n")
458 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000459 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000460
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000461 # Verify environ keys and values from the OS are of the
462 # correct str type.
463 def test_keyvalue_types(self):
464 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000465 self.assertEqual(type(key), str)
466 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000467
Christian Heimes90333392007-11-01 19:08:42 +0000468 def test_items(self):
469 for key, value in self._reference().items():
470 self.assertEqual(os.environ.get(key), value)
471
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000472 # Issue 7310
473 def test___repr__(self):
474 """Check that the repr() of os.environ looks like environ({...})."""
475 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000476 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
477 '{!r}: {!r}'.format(key, value)
478 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000479
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000480 def test_get_exec_path(self):
481 defpath_list = os.defpath.split(os.pathsep)
482 test_path = ['/monty', '/python', '', '/flying/circus']
483 test_env = {'PATH': os.pathsep.join(test_path)}
484
485 saved_environ = os.environ
486 try:
487 os.environ = dict(test_env)
488 # Test that defaulting to os.environ works.
489 self.assertSequenceEqual(test_path, os.get_exec_path())
490 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
491 finally:
492 os.environ = saved_environ
493
494 # No PATH environment variable
495 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
496 # Empty PATH environment variable
497 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
498 # Supplied PATH environment variable
499 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
500
Victor Stinnerb745a742010-05-18 17:17:23 +0000501 if os.supports_bytes_environ:
502 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000503 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000504 # ignore BytesWarning warning
505 with warnings.catch_warnings(record=True):
506 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000507 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000508 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000509 pass
510 else:
511 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000512
513 # bytes key and/or value
514 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
515 ['abc'])
516 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
517 ['abc'])
518 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
519 ['abc'])
520
521 @unittest.skipUnless(os.supports_bytes_environ,
522 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000523 def test_environb(self):
524 # os.environ -> os.environb
525 value = 'euro\u20ac'
526 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000527 value_bytes = value.encode(sys.getfilesystemencoding(),
528 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000529 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000530 msg = "U+20AC character is not encodable to %s" % (
531 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000532 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000533 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000534 self.assertEqual(os.environ['unicode'], value)
535 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000536
537 # os.environb -> os.environ
538 value = b'\xff'
539 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000540 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000541 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000542 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000543
Charles-François Natali2966f102011-11-26 11:32:46 +0100544 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
545 # #13415).
546 @support.requires_freebsd_version(7)
547 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100548 def test_unset_error(self):
549 if sys.platform == "win32":
550 # an environment variable is limited to 32,767 characters
551 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100552 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100553 else:
554 # "=" is not allowed in a variable name
555 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100556 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100557
Tim Petersc4e09402003-04-25 07:11:48 +0000558class WalkTests(unittest.TestCase):
559 """Tests for os.walk()."""
560
Charles-François Natali7372b062012-02-05 15:15:38 +0100561 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000562 import os
563 from os.path import join
564
565 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000566 # TESTFN/
567 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000568 # tmp1
569 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000570 # tmp2
571 # SUB11/ no kids
572 # SUB2/ a file kid and a dirsymlink kid
573 # tmp3
574 # link/ a symlink to TESTFN.2
575 # TEST2/
576 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000577 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000578 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000579 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000580 sub2_path = join(walk_path, "SUB2")
581 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000582 tmp2_path = join(sub1_path, "tmp2")
583 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000584 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000585 t2_path = join(support.TESTFN, "TEST2")
586 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000587
588 # Create stuff.
589 os.makedirs(sub11_path)
590 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000591 os.makedirs(t2_path)
592 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000593 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000594 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
595 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000596 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100597 if os.name == 'nt':
598 def symlink_to_dir(src, dest):
599 os.symlink(src, dest, True)
600 else:
601 symlink_to_dir = os.symlink
602 symlink_to_dir(os.path.abspath(t2_path), link_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000603 sub2_tree = (sub2_path, ["link"], ["tmp3"])
604 else:
605 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000606
607 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000608 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000609 self.assertEqual(len(all), 4)
610 # We can't know which order SUB1 and SUB2 will appear in.
611 # Not flipped: TESTFN, SUB1, SUB11, SUB2
612 # flipped: TESTFN, SUB2, SUB1, SUB11
613 flipped = all[0][1][0] != "SUB1"
614 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000615 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000616 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
617 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000618 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000619
620 # Prune the search.
621 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000622 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000623 all.append((root, dirs, files))
624 # Don't descend into SUB1.
625 if 'SUB1' in dirs:
626 # Note that this also mutates the dirs we appended to all!
627 dirs.remove('SUB1')
628 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000629 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
630 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000631
632 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000633 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000634 self.assertEqual(len(all), 4)
635 # We can't know which order SUB1 and SUB2 will appear in.
636 # Not flipped: SUB11, SUB1, SUB2, TESTFN
637 # flipped: SUB2, SUB11, SUB1, TESTFN
638 flipped = all[3][1][0] != "SUB1"
639 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000640 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000641 self.assertEqual(all[flipped], (sub11_path, [], []))
642 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000643 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000644
Brian Curtin3b4499c2010-12-28 14:31:47 +0000645 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000646 # Walk, following symlinks.
647 for root, dirs, files in os.walk(walk_path, followlinks=True):
648 if root == link_path:
649 self.assertEqual(dirs, [])
650 self.assertEqual(files, ["tmp4"])
651 break
652 else:
653 self.fail("Didn't follow symlink with followlinks=True")
654
655 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000656 # Tear everything down. This is a decent use for bottom-up on
657 # Windows, which doesn't have a recursive delete command. The
658 # (not so) subtlety is that rmdir will fail unless the dir's
659 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000660 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000661 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000662 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000663 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000664 dirname = os.path.join(root, name)
665 if not os.path.islink(dirname):
666 os.rmdir(dirname)
667 else:
668 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000669 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000670
Charles-François Natali7372b062012-02-05 15:15:38 +0100671
672@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
673class FwalkTests(WalkTests):
674 """Tests for os.fwalk()."""
675
676 def test_compare_to_walk(self):
677 # compare with walk() results
678 for topdown, followlinks in itertools.product((True, False), repeat=2):
679 args = support.TESTFN, topdown, None, followlinks
680 expected = {}
681 for root, dirs, files in os.walk(*args):
682 expected[root] = (set(dirs), set(files))
683
684 for root, dirs, files, rootfd in os.fwalk(*args):
685 self.assertIn(root, expected)
686 self.assertEqual(expected[root], (set(dirs), set(files)))
687
688 def test_dir_fd(self):
689 # check returned file descriptors
690 for topdown, followlinks in itertools.product((True, False), repeat=2):
691 args = support.TESTFN, topdown, None, followlinks
692 for root, dirs, files, rootfd in os.fwalk(*args):
693 # check that the FD is valid
694 os.fstat(rootfd)
Charles-François Natali77940902012-02-06 19:54:48 +0100695 # check that flistdir() returns consistent information
696 self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100697
698 def test_fd_leak(self):
699 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
700 # we both check that calling fwalk() a large number of times doesn't
701 # yield EMFILE, and that the minimum allocated FD hasn't changed.
702 minfd = os.dup(1)
703 os.close(minfd)
704 for i in range(256):
705 for x in os.fwalk(support.TESTFN):
706 pass
707 newfd = os.dup(1)
708 self.addCleanup(os.close, newfd)
709 self.assertEqual(newfd, minfd)
710
711 def tearDown(self):
712 # cleanup
713 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
714 for name in files:
715 os.unlinkat(rootfd, name)
716 for name in dirs:
717 st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW)
718 if stat.S_ISDIR(st.st_mode):
719 os.unlinkat(rootfd, name, os.AT_REMOVEDIR)
720 else:
721 os.unlinkat(rootfd, name)
722 os.rmdir(support.TESTFN)
723
724
Guido van Rossume7ba4952007-06-06 23:52:48 +0000725class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000726 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000727 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000728
729 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000730 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000731 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
732 os.makedirs(path) # Should work
733 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
734 os.makedirs(path)
735
736 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000737 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000738 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
739 os.makedirs(path)
740 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
741 'dir5', 'dir6')
742 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000743
Terry Reedy5a22b652010-12-02 07:05:56 +0000744 def test_exist_ok_existing_directory(self):
745 path = os.path.join(support.TESTFN, 'dir1')
746 mode = 0o777
747 old_mask = os.umask(0o022)
748 os.makedirs(path, mode)
749 self.assertRaises(OSError, os.makedirs, path, mode)
750 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
751 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
752 os.makedirs(path, mode=mode, exist_ok=True)
753 os.umask(old_mask)
754
755 def test_exist_ok_existing_regular_file(self):
756 base = support.TESTFN
757 path = os.path.join(support.TESTFN, 'dir1')
758 f = open(path, 'w')
759 f.write('abc')
760 f.close()
761 self.assertRaises(OSError, os.makedirs, path)
762 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
763 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
764 os.remove(path)
765
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000766 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000767 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000768 'dir4', 'dir5', 'dir6')
769 # If the tests failed, the bottom-most directory ('../dir6')
770 # may not have been created, so we look for the outermost directory
771 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000772 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000773 path = os.path.dirname(path)
774
775 os.removedirs(path)
776
Guido van Rossume7ba4952007-06-06 23:52:48 +0000777class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000778 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200779 with open(os.devnull, 'wb') as f:
780 f.write(b'hello')
781 f.close()
782 with open(os.devnull, 'rb') as f:
783 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000784
Guido van Rossume7ba4952007-06-06 23:52:48 +0000785class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000786 def test_urandom(self):
787 try:
788 self.assertEqual(len(os.urandom(1)), 1)
789 self.assertEqual(len(os.urandom(10)), 10)
790 self.assertEqual(len(os.urandom(100)), 100)
791 self.assertEqual(len(os.urandom(1000)), 1000)
792 except NotImplementedError:
793 pass
794
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000795@contextlib.contextmanager
796def _execvpe_mockup(defpath=None):
797 """
798 Stubs out execv and execve functions when used as context manager.
799 Records exec calls. The mock execv and execve functions always raise an
800 exception as they would normally never return.
801 """
802 # A list of tuples containing (function name, first arg, args)
803 # of calls to execv or execve that have been made.
804 calls = []
805
806 def mock_execv(name, *args):
807 calls.append(('execv', name, args))
808 raise RuntimeError("execv called")
809
810 def mock_execve(name, *args):
811 calls.append(('execve', name, args))
812 raise OSError(errno.ENOTDIR, "execve called")
813
814 try:
815 orig_execv = os.execv
816 orig_execve = os.execve
817 orig_defpath = os.defpath
818 os.execv = mock_execv
819 os.execve = mock_execve
820 if defpath is not None:
821 os.defpath = defpath
822 yield calls
823 finally:
824 os.execv = orig_execv
825 os.execve = orig_execve
826 os.defpath = orig_defpath
827
Guido van Rossume7ba4952007-06-06 23:52:48 +0000828class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000829 @unittest.skipIf(USING_LINUXTHREADS,
830 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000831 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000832 self.assertRaises(OSError, os.execvpe, 'no such app-',
833 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000834
Thomas Heller6790d602007-08-30 17:15:14 +0000835 def test_execvpe_with_bad_arglist(self):
836 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
837
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000838 @unittest.skipUnless(hasattr(os, '_execvpe'),
839 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000840 def _test_internal_execvpe(self, test_type):
841 program_path = os.sep + 'absolutepath'
842 if test_type is bytes:
843 program = b'executable'
844 fullpath = os.path.join(os.fsencode(program_path), program)
845 native_fullpath = fullpath
846 arguments = [b'progname', 'arg1', 'arg2']
847 else:
848 program = 'executable'
849 arguments = ['progname', 'arg1', 'arg2']
850 fullpath = os.path.join(program_path, program)
851 if os.name != "nt":
852 native_fullpath = os.fsencode(fullpath)
853 else:
854 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000855 env = {'spam': 'beans'}
856
Victor Stinnerb745a742010-05-18 17:17:23 +0000857 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000858 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000859 self.assertRaises(RuntimeError,
860 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000861 self.assertEqual(len(calls), 1)
862 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
863
Victor Stinnerb745a742010-05-18 17:17:23 +0000864 # test os._execvpe() with a relative path:
865 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000866 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000867 self.assertRaises(OSError,
868 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000869 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000870 self.assertSequenceEqual(calls[0],
871 ('execve', native_fullpath, (arguments, env)))
872
873 # test os._execvpe() with a relative path:
874 # os.get_exec_path() reads the 'PATH' variable
875 with _execvpe_mockup() as calls:
876 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000877 if test_type is bytes:
878 env_path[b'PATH'] = program_path
879 else:
880 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000881 self.assertRaises(OSError,
882 os._execvpe, program, arguments, env=env_path)
883 self.assertEqual(len(calls), 1)
884 self.assertSequenceEqual(calls[0],
885 ('execve', native_fullpath, (arguments, env_path)))
886
887 def test_internal_execvpe_str(self):
888 self._test_internal_execvpe(str)
889 if os.name != "nt":
890 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000891
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000892
Thomas Wouters477c8d52006-05-27 19:21:47 +0000893class Win32ErrorTests(unittest.TestCase):
894 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000895 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000896
897 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000898 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000899
900 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000901 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000902
903 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000904 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000905 try:
906 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
907 finally:
908 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000909 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000910
911 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000912 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000913
Thomas Wouters477c8d52006-05-27 19:21:47 +0000914 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000915 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000916
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000917class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000918 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000919 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
920 #singles.append("close")
921 #We omit close because it doesn'r raise an exception on some platforms
922 def get_single(f):
923 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000924 if hasattr(os, f):
925 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000926 return helper
927 for f in singles:
928 locals()["test_"+f] = get_single(f)
929
Benjamin Peterson7522c742009-01-19 21:00:09 +0000930 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000931 try:
932 f(support.make_bad_fd(), *args)
933 except OSError as e:
934 self.assertEqual(e.errno, errno.EBADF)
935 else:
936 self.fail("%r didn't raise a OSError with a bad file descriptor"
937 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000938
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000939 def test_isatty(self):
940 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000941 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000942
943 def test_closerange(self):
944 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000945 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000946 # Make sure none of the descriptors we are about to close are
947 # currently valid (issue 6542).
948 for i in range(10):
949 try: os.fstat(fd+i)
950 except OSError:
951 pass
952 else:
953 break
954 if i < 2:
955 raise unittest.SkipTest(
956 "Unable to acquire a range of invalid file descriptors")
957 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000958
959 def test_dup2(self):
960 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000961 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000962
963 def test_fchmod(self):
964 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000965 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000966
967 def test_fchown(self):
968 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000969 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000970
971 def test_fpathconf(self):
972 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000973 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000974
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000975 def test_ftruncate(self):
976 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000977 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000978
979 def test_lseek(self):
980 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000981 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000982
983 def test_read(self):
984 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000985 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000986
987 def test_tcsetpgrpt(self):
988 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000989 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000990
991 def test_write(self):
992 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000993 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000994
Brian Curtin1b9df392010-11-24 20:24:31 +0000995
996class LinkTests(unittest.TestCase):
997 def setUp(self):
998 self.file1 = support.TESTFN
999 self.file2 = os.path.join(support.TESTFN + "2")
1000
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001001 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001002 for file in (self.file1, self.file2):
1003 if os.path.exists(file):
1004 os.unlink(file)
1005
Brian Curtin1b9df392010-11-24 20:24:31 +00001006 def _test_link(self, file1, file2):
1007 with open(file1, "w") as f1:
1008 f1.write("test")
1009
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001010 with warnings.catch_warnings():
1011 warnings.simplefilter("ignore", DeprecationWarning)
1012 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001013 with open(file1, "r") as f1, open(file2, "r") as f2:
1014 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1015
1016 def test_link(self):
1017 self._test_link(self.file1, self.file2)
1018
1019 def test_link_bytes(self):
1020 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1021 bytes(self.file2, sys.getfilesystemencoding()))
1022
Brian Curtinf498b752010-11-30 15:54:04 +00001023 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001024 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001025 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001026 except UnicodeError:
1027 raise unittest.SkipTest("Unable to encode for this platform.")
1028
Brian Curtinf498b752010-11-30 15:54:04 +00001029 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001030 self.file2 = self.file1 + "2"
1031 self._test_link(self.file1, self.file2)
1032
Thomas Wouters477c8d52006-05-27 19:21:47 +00001033if sys.platform != 'win32':
1034 class Win32ErrorTests(unittest.TestCase):
1035 pass
1036
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001037 class PosixUidGidTests(unittest.TestCase):
1038 if hasattr(os, 'setuid'):
1039 def test_setuid(self):
1040 if os.getuid() != 0:
1041 self.assertRaises(os.error, os.setuid, 0)
1042 self.assertRaises(OverflowError, os.setuid, 1<<32)
1043
1044 if hasattr(os, 'setgid'):
1045 def test_setgid(self):
1046 if os.getuid() != 0:
1047 self.assertRaises(os.error, os.setgid, 0)
1048 self.assertRaises(OverflowError, os.setgid, 1<<32)
1049
1050 if hasattr(os, 'seteuid'):
1051 def test_seteuid(self):
1052 if os.getuid() != 0:
1053 self.assertRaises(os.error, os.seteuid, 0)
1054 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1055
1056 if hasattr(os, 'setegid'):
1057 def test_setegid(self):
1058 if os.getuid() != 0:
1059 self.assertRaises(os.error, os.setegid, 0)
1060 self.assertRaises(OverflowError, os.setegid, 1<<32)
1061
1062 if hasattr(os, 'setreuid'):
1063 def test_setreuid(self):
1064 if os.getuid() != 0:
1065 self.assertRaises(os.error, os.setreuid, 0, 0)
1066 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1067 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001068
1069 def test_setreuid_neg1(self):
1070 # Needs to accept -1. We run this in a subprocess to avoid
1071 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001072 subprocess.check_call([
1073 sys.executable, '-c',
1074 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001075
1076 if hasattr(os, 'setregid'):
1077 def test_setregid(self):
1078 if os.getuid() != 0:
1079 self.assertRaises(os.error, os.setregid, 0, 0)
1080 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1081 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001082
1083 def test_setregid_neg1(self):
1084 # Needs to accept -1. We run this in a subprocess to avoid
1085 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001086 subprocess.check_call([
1087 sys.executable, '-c',
1088 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001089
1090 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001091 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001092 if support.TESTFN_UNENCODABLE:
1093 self.dir = support.TESTFN_UNENCODABLE
1094 else:
1095 self.dir = support.TESTFN
1096 self.bdir = os.fsencode(self.dir)
1097
1098 bytesfn = []
1099 def add_filename(fn):
1100 try:
1101 fn = os.fsencode(fn)
1102 except UnicodeEncodeError:
1103 return
1104 bytesfn.append(fn)
1105 add_filename(support.TESTFN_UNICODE)
1106 if support.TESTFN_UNENCODABLE:
1107 add_filename(support.TESTFN_UNENCODABLE)
1108 if not bytesfn:
1109 self.skipTest("couldn't create any non-ascii filename")
1110
1111 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001112 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001113 try:
1114 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001115 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001116 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001117 if fn in self.unicodefn:
1118 raise ValueError("duplicate filename")
1119 self.unicodefn.add(fn)
1120 except:
1121 shutil.rmtree(self.dir)
1122 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001123
1124 def tearDown(self):
1125 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001126
1127 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001128 expected = self.unicodefn
1129 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001130 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001131
1132 def test_open(self):
1133 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001134 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001135 f.close()
1136
1137 def test_stat(self):
1138 for fn in self.unicodefn:
1139 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001140else:
1141 class PosixUidGidTests(unittest.TestCase):
1142 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001143 class Pep383Tests(unittest.TestCase):
1144 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001145
Brian Curtineb24d742010-04-12 17:16:38 +00001146@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1147class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001148 def _kill(self, sig):
1149 # Start sys.executable as a subprocess and communicate from the
1150 # subprocess to the parent that the interpreter is ready. When it
1151 # becomes ready, send *sig* via os.kill to the subprocess and check
1152 # that the return code is equal to *sig*.
1153 import ctypes
1154 from ctypes import wintypes
1155 import msvcrt
1156
1157 # Since we can't access the contents of the process' stdout until the
1158 # process has exited, use PeekNamedPipe to see what's inside stdout
1159 # without waiting. This is done so we can tell that the interpreter
1160 # is started and running at a point where it could handle a signal.
1161 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1162 PeekNamedPipe.restype = wintypes.BOOL
1163 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1164 ctypes.POINTER(ctypes.c_char), # stdout buf
1165 wintypes.DWORD, # Buffer size
1166 ctypes.POINTER(wintypes.DWORD), # bytes read
1167 ctypes.POINTER(wintypes.DWORD), # bytes avail
1168 ctypes.POINTER(wintypes.DWORD)) # bytes left
1169 msg = "running"
1170 proc = subprocess.Popen([sys.executable, "-c",
1171 "import sys;"
1172 "sys.stdout.write('{}');"
1173 "sys.stdout.flush();"
1174 "input()".format(msg)],
1175 stdout=subprocess.PIPE,
1176 stderr=subprocess.PIPE,
1177 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001178 self.addCleanup(proc.stdout.close)
1179 self.addCleanup(proc.stderr.close)
1180 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001181
1182 count, max = 0, 100
1183 while count < max and proc.poll() is None:
1184 # Create a string buffer to store the result of stdout from the pipe
1185 buf = ctypes.create_string_buffer(len(msg))
1186 # Obtain the text currently in proc.stdout
1187 # Bytes read/avail/left are left as NULL and unused
1188 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1189 buf, ctypes.sizeof(buf), None, None, None)
1190 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1191 if buf.value:
1192 self.assertEqual(msg, buf.value.decode())
1193 break
1194 time.sleep(0.1)
1195 count += 1
1196 else:
1197 self.fail("Did not receive communication from the subprocess")
1198
Brian Curtineb24d742010-04-12 17:16:38 +00001199 os.kill(proc.pid, sig)
1200 self.assertEqual(proc.wait(), sig)
1201
1202 def test_kill_sigterm(self):
1203 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001204 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001205
1206 def test_kill_int(self):
1207 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001208 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001209
1210 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001211 tagname = "test_os_%s" % uuid.uuid1()
1212 m = mmap.mmap(-1, 1, tagname)
1213 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001214 # Run a script which has console control handling enabled.
1215 proc = subprocess.Popen([sys.executable,
1216 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001217 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001218 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1219 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001220 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001221 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001222 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001223 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001224 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001225 count += 1
1226 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001227 # Forcefully kill the process if we weren't able to signal it.
1228 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001229 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001230 os.kill(proc.pid, event)
1231 # proc.send_signal(event) could also be done here.
1232 # Allow time for the signal to be passed and the process to exit.
1233 time.sleep(0.5)
1234 if not proc.poll():
1235 # Forcefully kill the process if we weren't able to signal it.
1236 os.kill(proc.pid, signal.SIGINT)
1237 self.fail("subprocess did not stop on {}".format(name))
1238
1239 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1240 def test_CTRL_C_EVENT(self):
1241 from ctypes import wintypes
1242 import ctypes
1243
1244 # Make a NULL value by creating a pointer with no argument.
1245 NULL = ctypes.POINTER(ctypes.c_int)()
1246 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1247 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1248 wintypes.BOOL)
1249 SetConsoleCtrlHandler.restype = wintypes.BOOL
1250
1251 # Calling this with NULL and FALSE causes the calling process to
1252 # handle CTRL+C, rather than ignore it. This property is inherited
1253 # by subprocesses.
1254 SetConsoleCtrlHandler(NULL, 0)
1255
1256 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1257
1258 def test_CTRL_BREAK_EVENT(self):
1259 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1260
1261
Brian Curtind40e6f72010-07-08 21:39:08 +00001262@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001263@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001264class Win32SymlinkTests(unittest.TestCase):
1265 filelink = 'filelinktest'
1266 filelink_target = os.path.abspath(__file__)
1267 dirlink = 'dirlinktest'
1268 dirlink_target = os.path.dirname(filelink_target)
1269 missing_link = 'missing link'
1270
1271 def setUp(self):
1272 assert os.path.exists(self.dirlink_target)
1273 assert os.path.exists(self.filelink_target)
1274 assert not os.path.exists(self.dirlink)
1275 assert not os.path.exists(self.filelink)
1276 assert not os.path.exists(self.missing_link)
1277
1278 def tearDown(self):
1279 if os.path.exists(self.filelink):
1280 os.remove(self.filelink)
1281 if os.path.exists(self.dirlink):
1282 os.rmdir(self.dirlink)
1283 if os.path.lexists(self.missing_link):
1284 os.remove(self.missing_link)
1285
1286 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001287 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001288 self.assertTrue(os.path.exists(self.dirlink))
1289 self.assertTrue(os.path.isdir(self.dirlink))
1290 self.assertTrue(os.path.islink(self.dirlink))
1291 self.check_stat(self.dirlink, self.dirlink_target)
1292
1293 def test_file_link(self):
1294 os.symlink(self.filelink_target, self.filelink)
1295 self.assertTrue(os.path.exists(self.filelink))
1296 self.assertTrue(os.path.isfile(self.filelink))
1297 self.assertTrue(os.path.islink(self.filelink))
1298 self.check_stat(self.filelink, self.filelink_target)
1299
1300 def _create_missing_dir_link(self):
1301 'Create a "directory" link to a non-existent target'
1302 linkname = self.missing_link
1303 if os.path.lexists(linkname):
1304 os.remove(linkname)
1305 target = r'c:\\target does not exist.29r3c740'
1306 assert not os.path.exists(target)
1307 target_is_dir = True
1308 os.symlink(target, linkname, target_is_dir)
1309
1310 def test_remove_directory_link_to_missing_target(self):
1311 self._create_missing_dir_link()
1312 # For compatibility with Unix, os.remove will check the
1313 # directory status and call RemoveDirectory if the symlink
1314 # was created with target_is_dir==True.
1315 os.remove(self.missing_link)
1316
1317 @unittest.skip("currently fails; consider for improvement")
1318 def test_isdir_on_directory_link_to_missing_target(self):
1319 self._create_missing_dir_link()
1320 # consider having isdir return true for directory links
1321 self.assertTrue(os.path.isdir(self.missing_link))
1322
1323 @unittest.skip("currently fails; consider for improvement")
1324 def test_rmdir_on_directory_link_to_missing_target(self):
1325 self._create_missing_dir_link()
1326 # consider allowing rmdir to remove directory links
1327 os.rmdir(self.missing_link)
1328
1329 def check_stat(self, link, target):
1330 self.assertEqual(os.stat(link), os.stat(target))
1331 self.assertNotEqual(os.lstat(link), os.stat(link))
1332
Brian Curtind25aef52011-06-13 15:16:04 -05001333 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001334 with warnings.catch_warnings():
1335 warnings.simplefilter("ignore", DeprecationWarning)
1336 self.assertEqual(os.stat(bytes_link), os.stat(target))
1337 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001338
1339 def test_12084(self):
1340 level1 = os.path.abspath(support.TESTFN)
1341 level2 = os.path.join(level1, "level2")
1342 level3 = os.path.join(level2, "level3")
1343 try:
1344 os.mkdir(level1)
1345 os.mkdir(level2)
1346 os.mkdir(level3)
1347
1348 file1 = os.path.abspath(os.path.join(level1, "file1"))
1349
1350 with open(file1, "w") as f:
1351 f.write("file1")
1352
1353 orig_dir = os.getcwd()
1354 try:
1355 os.chdir(level2)
1356 link = os.path.join(level2, "link")
1357 os.symlink(os.path.relpath(file1), "link")
1358 self.assertIn("link", os.listdir(os.getcwd()))
1359
1360 # Check os.stat calls from the same dir as the link
1361 self.assertEqual(os.stat(file1), os.stat("link"))
1362
1363 # Check os.stat calls from a dir below the link
1364 os.chdir(level1)
1365 self.assertEqual(os.stat(file1),
1366 os.stat(os.path.relpath(link)))
1367
1368 # Check os.stat calls from a dir above the link
1369 os.chdir(level3)
1370 self.assertEqual(os.stat(file1),
1371 os.stat(os.path.relpath(link)))
1372 finally:
1373 os.chdir(orig_dir)
1374 except OSError as err:
1375 self.fail(err)
1376 finally:
1377 os.remove(file1)
1378 shutil.rmtree(level1)
1379
Brian Curtind40e6f72010-07-08 21:39:08 +00001380
Victor Stinnere8d51452010-08-19 01:05:19 +00001381class FSEncodingTests(unittest.TestCase):
1382 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001383 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1384 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001385
Victor Stinnere8d51452010-08-19 01:05:19 +00001386 def test_identity(self):
1387 # assert fsdecode(fsencode(x)) == x
1388 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1389 try:
1390 bytesfn = os.fsencode(fn)
1391 except UnicodeEncodeError:
1392 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001393 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001394
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001395
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001396class PidTests(unittest.TestCase):
1397 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1398 def test_getppid(self):
1399 p = subprocess.Popen([sys.executable, '-c',
1400 'import os; print(os.getppid())'],
1401 stdout=subprocess.PIPE)
1402 stdout, _ = p.communicate()
1403 # We are the parent of our subprocess
1404 self.assertEqual(int(stdout), os.getpid())
1405
1406
Brian Curtin0151b8e2010-09-24 13:43:43 +00001407# The introduction of this TestCase caused at least two different errors on
1408# *nix buildbots. Temporarily skip this to let the buildbots move along.
1409@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001410@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1411class LoginTests(unittest.TestCase):
1412 def test_getlogin(self):
1413 user_name = os.getlogin()
1414 self.assertNotEqual(len(user_name), 0)
1415
1416
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001417@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1418 "needs os.getpriority and os.setpriority")
1419class ProgramPriorityTests(unittest.TestCase):
1420 """Tests for os.getpriority() and os.setpriority()."""
1421
1422 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001423
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001424 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1425 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1426 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001427 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1428 if base >= 19 and new_prio <= 19:
1429 raise unittest.SkipTest(
1430 "unable to reliably test setpriority at current nice level of %s" % base)
1431 else:
1432 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001433 finally:
1434 try:
1435 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1436 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001437 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001438 raise
1439
1440
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001441if threading is not None:
1442 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001443
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001444 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001445
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001446 def __init__(self, conn):
1447 asynchat.async_chat.__init__(self, conn)
1448 self.in_buffer = []
1449 self.closed = False
1450 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001451
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001452 def handle_read(self):
1453 data = self.recv(4096)
1454 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001455
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001456 def get_data(self):
1457 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001458
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001459 def handle_close(self):
1460 self.close()
1461 self.closed = True
1462
1463 def handle_error(self):
1464 raise
1465
1466 def __init__(self, address):
1467 threading.Thread.__init__(self)
1468 asyncore.dispatcher.__init__(self)
1469 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1470 self.bind(address)
1471 self.listen(5)
1472 self.host, self.port = self.socket.getsockname()[:2]
1473 self.handler_instance = None
1474 self._active = False
1475 self._active_lock = threading.Lock()
1476
1477 # --- public API
1478
1479 @property
1480 def running(self):
1481 return self._active
1482
1483 def start(self):
1484 assert not self.running
1485 self.__flag = threading.Event()
1486 threading.Thread.start(self)
1487 self.__flag.wait()
1488
1489 def stop(self):
1490 assert self.running
1491 self._active = False
1492 self.join()
1493
1494 def wait(self):
1495 # wait for handler connection to be closed, then stop the server
1496 while not getattr(self.handler_instance, "closed", False):
1497 time.sleep(0.001)
1498 self.stop()
1499
1500 # --- internals
1501
1502 def run(self):
1503 self._active = True
1504 self.__flag.set()
1505 while self._active and asyncore.socket_map:
1506 self._active_lock.acquire()
1507 asyncore.loop(timeout=0.001, count=1)
1508 self._active_lock.release()
1509 asyncore.close_all()
1510
1511 def handle_accept(self):
1512 conn, addr = self.accept()
1513 self.handler_instance = self.Handler(conn)
1514
1515 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001516 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001517 handle_read = handle_connect
1518
1519 def writable(self):
1520 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001521
1522 def handle_error(self):
1523 raise
1524
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001525
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001526@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001527@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1528class TestSendfile(unittest.TestCase):
1529
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001530 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001531 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001532 not sys.platform.startswith("solaris") and \
1533 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001534
1535 @classmethod
1536 def setUpClass(cls):
1537 with open(support.TESTFN, "wb") as f:
1538 f.write(cls.DATA)
1539
1540 @classmethod
1541 def tearDownClass(cls):
1542 support.unlink(support.TESTFN)
1543
1544 def setUp(self):
1545 self.server = SendfileTestServer((support.HOST, 0))
1546 self.server.start()
1547 self.client = socket.socket()
1548 self.client.connect((self.server.host, self.server.port))
1549 self.client.settimeout(1)
1550 # synchronize by waiting for "220 ready" response
1551 self.client.recv(1024)
1552 self.sockno = self.client.fileno()
1553 self.file = open(support.TESTFN, 'rb')
1554 self.fileno = self.file.fileno()
1555
1556 def tearDown(self):
1557 self.file.close()
1558 self.client.close()
1559 if self.server.running:
1560 self.server.stop()
1561
1562 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1563 """A higher level wrapper representing how an application is
1564 supposed to use sendfile().
1565 """
1566 while 1:
1567 try:
1568 if self.SUPPORT_HEADERS_TRAILERS:
1569 return os.sendfile(sock, file, offset, nbytes, headers,
1570 trailers)
1571 else:
1572 return os.sendfile(sock, file, offset, nbytes)
1573 except OSError as err:
1574 if err.errno == errno.ECONNRESET:
1575 # disconnected
1576 raise
1577 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1578 # we have to retry send data
1579 continue
1580 else:
1581 raise
1582
1583 def test_send_whole_file(self):
1584 # normal send
1585 total_sent = 0
1586 offset = 0
1587 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001588 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001589 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1590 if sent == 0:
1591 break
1592 offset += sent
1593 total_sent += sent
1594 self.assertTrue(sent <= nbytes)
1595 self.assertEqual(offset, total_sent)
1596
1597 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001598 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001599 self.client.close()
1600 self.server.wait()
1601 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001602 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001603 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001604
1605 def test_send_at_certain_offset(self):
1606 # start sending a file at a certain offset
1607 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001608 offset = len(self.DATA) // 2
1609 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001610 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001611 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001612 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1613 if sent == 0:
1614 break
1615 offset += sent
1616 total_sent += sent
1617 self.assertTrue(sent <= nbytes)
1618
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001619 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001620 self.client.close()
1621 self.server.wait()
1622 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001623 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001624 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001625 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001626 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001627
1628 def test_offset_overflow(self):
1629 # specify an offset > file size
1630 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001631 try:
1632 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1633 except OSError as e:
1634 # Solaris can raise EINVAL if offset >= file length, ignore.
1635 if e.errno != errno.EINVAL:
1636 raise
1637 else:
1638 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001639 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001640 self.client.close()
1641 self.server.wait()
1642 data = self.server.handler_instance.get_data()
1643 self.assertEqual(data, b'')
1644
1645 def test_invalid_offset(self):
1646 with self.assertRaises(OSError) as cm:
1647 os.sendfile(self.sockno, self.fileno, -1, 4096)
1648 self.assertEqual(cm.exception.errno, errno.EINVAL)
1649
1650 # --- headers / trailers tests
1651
1652 if SUPPORT_HEADERS_TRAILERS:
1653
1654 def test_headers(self):
1655 total_sent = 0
1656 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1657 headers=[b"x" * 512])
1658 total_sent += sent
1659 offset = 4096
1660 nbytes = 4096
1661 while 1:
1662 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1663 offset, nbytes)
1664 if sent == 0:
1665 break
1666 total_sent += sent
1667 offset += sent
1668
1669 expected_data = b"x" * 512 + self.DATA
1670 self.assertEqual(total_sent, len(expected_data))
1671 self.client.close()
1672 self.server.wait()
1673 data = self.server.handler_instance.get_data()
1674 self.assertEqual(hash(data), hash(expected_data))
1675
1676 def test_trailers(self):
1677 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001678 with open(TESTFN2, 'wb') as f:
1679 f.write(b"abcde")
1680 with open(TESTFN2, 'rb')as f:
1681 self.addCleanup(os.remove, TESTFN2)
1682 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1683 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001684 self.client.close()
1685 self.server.wait()
1686 data = self.server.handler_instance.get_data()
1687 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001688
1689 if hasattr(os, "SF_NODISKIO"):
1690 def test_flags(self):
1691 try:
1692 os.sendfile(self.sockno, self.fileno, 0, 4096,
1693 flags=os.SF_NODISKIO)
1694 except OSError as err:
1695 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1696 raise
1697
1698
Benjamin Peterson799bd802011-08-31 22:15:17 -04001699def supports_extended_attributes():
1700 if not hasattr(os, "setxattr"):
1701 return False
1702 try:
1703 with open(support.TESTFN, "wb") as fp:
1704 try:
1705 os.fsetxattr(fp.fileno(), b"user.test", b"")
1706 except OSError as e:
1707 if e.errno != errno.ENOTSUP:
1708 raise
1709 return False
1710 finally:
1711 support.unlink(support.TESTFN)
1712 # Kernels < 2.6.39 don't respect setxattr flags.
1713 kernel_version = platform.release()
1714 m = re.match("2.6.(\d{1,2})", kernel_version)
1715 return m is None or int(m.group(1)) >= 39
1716
1717
1718@unittest.skipUnless(supports_extended_attributes(),
1719 "no non-broken extended attribute support")
1720class ExtendedAttributeTests(unittest.TestCase):
1721
1722 def tearDown(self):
1723 support.unlink(support.TESTFN)
1724
1725 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1726 fn = support.TESTFN
1727 open(fn, "wb").close()
1728 with self.assertRaises(OSError) as cm:
1729 getxattr(fn, s("user.test"))
1730 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001731 init_xattr = listxattr(fn)
1732 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001733 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001734 xattr = set(init_xattr)
1735 xattr.add("user.test")
1736 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001737 self.assertEqual(getxattr(fn, b"user.test"), b"")
1738 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1739 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1740 with self.assertRaises(OSError) as cm:
1741 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1742 self.assertEqual(cm.exception.errno, errno.EEXIST)
1743 with self.assertRaises(OSError) as cm:
1744 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1745 self.assertEqual(cm.exception.errno, errno.ENODATA)
1746 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001747 xattr.add("user.test2")
1748 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001749 removexattr(fn, s("user.test"))
1750 with self.assertRaises(OSError) as cm:
1751 getxattr(fn, s("user.test"))
1752 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001753 xattr.remove("user.test")
1754 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001755 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1756 setxattr(fn, s("user.test"), b"a"*1024)
1757 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1758 removexattr(fn, s("user.test"))
1759 many = sorted("user.test{}".format(i) for i in range(100))
1760 for thing in many:
1761 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001762 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001763
1764 def _check_xattrs(self, *args):
1765 def make_bytes(s):
1766 return bytes(s, "ascii")
1767 self._check_xattrs_str(str, *args)
1768 support.unlink(support.TESTFN)
1769 self._check_xattrs_str(make_bytes, *args)
1770
1771 def test_simple(self):
1772 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1773 os.listxattr)
1774
1775 def test_lpath(self):
1776 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1777 os.llistxattr)
1778
1779 def test_fds(self):
1780 def getxattr(path, *args):
1781 with open(path, "rb") as fp:
1782 return os.fgetxattr(fp.fileno(), *args)
1783 def setxattr(path, *args):
1784 with open(path, "wb") as fp:
1785 os.fsetxattr(fp.fileno(), *args)
1786 def removexattr(path, *args):
1787 with open(path, "wb") as fp:
1788 os.fremovexattr(fp.fileno(), *args)
1789 def listxattr(path, *args):
1790 with open(path, "rb") as fp:
1791 return os.flistxattr(fp.fileno(), *args)
1792 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1793
1794
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001795@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1796class Win32DeprecatedBytesAPI(unittest.TestCase):
1797 def test_deprecated(self):
1798 import nt
1799 filename = os.fsencode(support.TESTFN)
1800 with warnings.catch_warnings():
1801 warnings.simplefilter("error", DeprecationWarning)
1802 for func, *args in (
1803 (nt._getfullpathname, filename),
1804 (nt._isdir, filename),
1805 (os.access, filename, os.R_OK),
1806 (os.chdir, filename),
1807 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001808 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001809 (os.link, filename, filename),
1810 (os.listdir, filename),
1811 (os.lstat, filename),
1812 (os.mkdir, filename),
1813 (os.open, filename, os.O_RDONLY),
1814 (os.rename, filename, filename),
1815 (os.rmdir, filename),
1816 (os.startfile, filename),
1817 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001818 (os.unlink, filename),
1819 (os.utime, filename),
1820 ):
1821 self.assertRaises(DeprecationWarning, func, *args)
1822
Victor Stinner28216442011-11-16 00:34:44 +01001823 @support.skip_unless_symlink
1824 def test_symlink(self):
1825 filename = os.fsencode(support.TESTFN)
1826 with warnings.catch_warnings():
1827 warnings.simplefilter("error", DeprecationWarning)
1828 self.assertRaises(DeprecationWarning,
1829 os.symlink, filename, filename)
1830
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001831
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001832@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001833def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001834 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001835 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001836 StatAttributeTests,
1837 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001838 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01001839 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001840 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001841 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001842 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001843 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001844 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001845 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001846 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001847 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001848 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001849 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001850 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001851 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001852 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001853 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001854 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001855 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001856 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001857 Win32DeprecatedBytesAPI,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001858 )
Fred Drake2e2be372001-09-20 21:33:42 +00001859
1860if __name__ == "__main__":
1861 test_main()