blob: a591f4bfcda708876ec1a45b7a34724e5394e4de [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000025try:
26 import threading
27except ImportError:
28 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000029
Mark Dickinson7cf03892010-04-16 13:45:35 +000030# Detect whether we're on a Linux system that uses the (now outdated
31# and unmaintained) linuxthreads threading library. There's an issue
32# when combining linuxthreads with a failed execv call: see
33# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020034if hasattr(sys, 'thread_info') and sys.thread_info.version:
35 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
36else:
37 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000038
Thomas Wouters0e3f5912006-08-11 14:57:12 +000039# Tests creating TESTFN
40class FileTests(unittest.TestCase):
41 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000042 if os.path.exists(support.TESTFN):
43 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000044 tearDown = setUp
45
46 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000047 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000049 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000050
Christian Heimesfdab48e2008-01-20 09:06:41 +000051 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000052 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
53 # We must allocate two consecutive file descriptors, otherwise
54 # it will mess up other file descriptors (perhaps even the three
55 # standard ones).
56 second = os.dup(first)
57 try:
58 retries = 0
59 while second != first + 1:
60 os.close(first)
61 retries += 1
62 if retries > 10:
63 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000064 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000065 first, second = second, os.dup(second)
66 finally:
67 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000068 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000069 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000070 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000071
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000072 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000073 def test_rename(self):
74 path = support.TESTFN
75 old = sys.getrefcount(path)
76 self.assertRaises(TypeError, os.rename, path, 0)
77 new = sys.getrefcount(path)
78 self.assertEqual(old, new)
79
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000080 def test_read(self):
81 with open(support.TESTFN, "w+b") as fobj:
82 fobj.write(b"spam")
83 fobj.flush()
84 fd = fobj.fileno()
85 os.lseek(fd, 0, 0)
86 s = os.read(fd, 4)
87 self.assertEqual(type(s), bytes)
88 self.assertEqual(s, b"spam")
89
90 def test_write(self):
91 # os.write() accepts bytes- and buffer-like objects but not strings
92 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
93 self.assertRaises(TypeError, os.write, fd, "beans")
94 os.write(fd, b"bacon\n")
95 os.write(fd, bytearray(b"eggs\n"))
96 os.write(fd, memoryview(b"spam\n"))
97 os.close(fd)
98 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000099 self.assertEqual(fobj.read().splitlines(),
100 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000101
Victor Stinnere0daff12011-03-20 23:36:35 +0100102 def write_windows_console(self, *args):
103 retcode = subprocess.call(args,
104 # use a new console to not flood the test output
105 creationflags=subprocess.CREATE_NEW_CONSOLE,
106 # use a shell to hide the console window (SW_HIDE)
107 shell=True)
108 self.assertEqual(retcode, 0)
109
110 @unittest.skipUnless(sys.platform == 'win32',
111 'test specific to the Windows console')
112 def test_write_windows_console(self):
113 # Issue #11395: the Windows console returns an error (12: not enough
114 # space error) on writing into stdout if stdout mode is binary and the
115 # length is greater than 66,000 bytes (or less, depending on heap
116 # usage).
117 code = "print('x' * 100000)"
118 self.write_windows_console(sys.executable, "-c", code)
119 self.write_windows_console(sys.executable, "-u", "-c", code)
120
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000121 def fdopen_helper(self, *args):
122 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200123 f = os.fdopen(fd, *args)
124 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000125
126 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200127 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
128 os.close(fd)
129
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000130 self.fdopen_helper()
131 self.fdopen_helper('r')
132 self.fdopen_helper('r', 100)
133
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100134 def test_replace(self):
135 TESTFN2 = support.TESTFN + ".2"
136 with open(support.TESTFN, 'w') as f:
137 f.write("1")
138 with open(TESTFN2, 'w') as f:
139 f.write("2")
140 self.addCleanup(os.unlink, TESTFN2)
141 os.replace(support.TESTFN, TESTFN2)
142 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
143 with open(TESTFN2, 'r') as f:
144 self.assertEqual(f.read(), "1")
145
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200146
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000147# Test attributes on return values from os.*stat* family.
148class StatAttributeTests(unittest.TestCase):
149 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000150 os.mkdir(support.TESTFN)
151 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000152 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000153 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000154 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000155
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000156 def tearDown(self):
157 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000159
Antoine Pitrou38425292010-09-21 18:19:07 +0000160 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000161 if not hasattr(os, "stat"):
162 return
163
Antoine Pitrou38425292010-09-21 18:19:07 +0000164 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165
166 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000167 self.assertEqual(result[stat.ST_SIZE], 3)
168 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000170 # Make sure all the attributes are there
171 members = dir(result)
172 for name in dir(stat):
173 if name[:3] == 'ST_':
174 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000175 if name.endswith("TIME"):
176 def trunc(x): return int(x)
177 else:
178 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000179 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000181 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000182
183 try:
184 result[200]
185 self.fail("No exception thrown")
186 except IndexError:
187 pass
188
189 # Make sure that assignment fails
190 try:
191 result.st_mode = 1
192 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000193 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000194 pass
195
196 try:
197 result.st_rdev = 1
198 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000199 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000200 pass
201
202 try:
203 result.parrot = 1
204 self.fail("No exception thrown")
205 except AttributeError:
206 pass
207
208 # Use the stat_result constructor with a too-short tuple.
209 try:
210 result2 = os.stat_result((10,))
211 self.fail("No exception thrown")
212 except TypeError:
213 pass
214
Ezio Melotti42da6632011-03-15 05:18:48 +0200215 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000216 try:
217 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
218 except TypeError:
219 pass
220
Antoine Pitrou38425292010-09-21 18:19:07 +0000221 def test_stat_attributes(self):
222 self.check_stat_attributes(self.fname)
223
224 def test_stat_attributes_bytes(self):
225 try:
226 fname = self.fname.encode(sys.getfilesystemencoding())
227 except UnicodeEncodeError:
228 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100229 with warnings.catch_warnings():
230 warnings.simplefilter("ignore", DeprecationWarning)
231 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000232
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233 def test_statvfs_attributes(self):
234 if not hasattr(os, "statvfs"):
235 return
236
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000237 try:
238 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000239 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000240 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000241 if e.errno == errno.ENOSYS:
242 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000243
244 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000247 # Make sure all the attributes are there.
248 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
249 'ffree', 'favail', 'flag', 'namemax')
250 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000252
253 # Make sure that assignment really fails
254 try:
255 result.f_bfree = 1
256 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000257 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258 pass
259
260 try:
261 result.parrot = 1
262 self.fail("No exception thrown")
263 except AttributeError:
264 pass
265
266 # Use the constructor with a too-short tuple.
267 try:
268 result2 = os.statvfs_result((10,))
269 self.fail("No exception thrown")
270 except TypeError:
271 pass
272
Ezio Melotti42da6632011-03-15 05:18:48 +0200273 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000274 try:
275 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
276 except TypeError:
277 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000278
Thomas Wouters89f507f2006-12-13 04:49:30 +0000279 def test_utime_dir(self):
280 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000281 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000282 # round to int, because some systems may support sub-second
283 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000284 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
285 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000286 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000287
Brian Curtin52fbea12011-11-06 13:41:17 -0600288 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600289 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600290 # second argument. Check that the previous methods of passing
291 # a time tuple or None work in addition to no argument.
292 st = os.stat(support.TESTFN)
293 # Doesn't set anything new, but sets the time tuple way
294 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
295 # Set to the current time in the old explicit way.
296 os.utime(support.TESTFN, None)
297 st1 = os.stat(support.TESTFN)
298 # Set to the current time in the new way
299 os.utime(support.TESTFN)
300 st2 = os.stat(support.TESTFN)
301 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
302
Victor Stinnerbe557de2012-02-08 03:01:11 +0100303 def test_utime_subsecond(self):
304 asec, amsec = 1, 901
305 atime = asec + amsec * 1e-3
306 msec, mmsec = 5, 901
307 mtime = msec + mmsec * 1e-3
308 filename = self.fname
309 dirname = os.path.dirname(filename)
Victor Stinner8b6f10d2012-02-08 03:07:25 +0100310 for func in ('utime', 'futimes', 'futimens', 'futimesat', 'lutimes', 'utimensat'):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100311 if not hasattr(os, func):
312 continue
313 os.utime(filename, (0, 0))
314 if func == 'utime':
315 os.utime(filename, (atime, mtime))
316 elif func == 'futimes':
317 with open(filename, "wb") as f:
318 os.futimes(f.fileno(), (atime, mtime))
319 os.utime(filename, (atime, mtime))
320 elif func == 'futimens':
321 with open(filename, "wb") as f:
322 os.futimens(f.fileno(),
323 (asec, amsec * 1000000),
324 (msec, mmsec * 1000000))
325 elif func == 'lutimes':
326 os.lutimes(filename, (atime, mtime))
Victor Stinner8b6f10d2012-02-08 03:07:25 +0100327 elif func == 'futimesat':
328 dirfd = os.open(dirname, os.O_RDONLY)
329 try:
330 os.futimesat(dirfd, os.path.basename(filename),
331 (atime, mtime))
332 finally:
333 os.close(dirfd)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100334 else:
335 dirfd = os.open(dirname, os.O_RDONLY)
336 try:
337 os.utimensat(dirfd, os.path.basename(filename),
338 (asec, amsec * 1000000),
339 (msec, mmsec * 1000000))
340 finally:
341 os.close(dirfd)
342 st = os.stat(filename)
343 self.assertAlmostEqual(st.st_atime, atime, places=3)
344 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
345
346
Thomas Wouters89f507f2006-12-13 04:49:30 +0000347 # Restrict test to Win32, since there is no guarantee other
348 # systems support centiseconds
349 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000350 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000351 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000352 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000353 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000354 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000355 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000356 return buf.value
357
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000358 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000359 def test_1565150(self):
360 t1 = 1159195039.25
361 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000362 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000363
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000364 def test_large_time(self):
365 t1 = 5000000000 # some day in 2128
366 os.utime(self.fname, (t1, t1))
367 self.assertEqual(os.stat(self.fname).st_mtime, t1)
368
Guido van Rossumd8faa362007-04-27 19:54:29 +0000369 def test_1686475(self):
370 # Verify that an open file can be stat'ed
371 try:
372 os.stat(r"c:\pagefile.sys")
373 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000374 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000375 return
376 self.fail("Could not stat pagefile.sys")
377
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000378from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000379
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000380class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000381 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000382 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000383
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000384 def setUp(self):
385 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000386 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000387 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000388 for key, value in self._reference().items():
389 os.environ[key] = value
390
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000391 def tearDown(self):
392 os.environ.clear()
393 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000394 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000395 os.environb.clear()
396 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000397
Christian Heimes90333392007-11-01 19:08:42 +0000398 def _reference(self):
399 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
400
401 def _empty_mapping(self):
402 os.environ.clear()
403 return os.environ
404
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000405 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000406 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000407 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000408 if os.path.exists("/bin/sh"):
409 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000410 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
411 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000412 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000413
Christian Heimes1a13d592007-11-08 14:16:55 +0000414 def test_os_popen_iter(self):
415 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000416 with os.popen(
417 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
418 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000419 self.assertEqual(next(it), "line1\n")
420 self.assertEqual(next(it), "line2\n")
421 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000422 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000423
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000424 # Verify environ keys and values from the OS are of the
425 # correct str type.
426 def test_keyvalue_types(self):
427 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000428 self.assertEqual(type(key), str)
429 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000430
Christian Heimes90333392007-11-01 19:08:42 +0000431 def test_items(self):
432 for key, value in self._reference().items():
433 self.assertEqual(os.environ.get(key), value)
434
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000435 # Issue 7310
436 def test___repr__(self):
437 """Check that the repr() of os.environ looks like environ({...})."""
438 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000439 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
440 '{!r}: {!r}'.format(key, value)
441 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000442
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000443 def test_get_exec_path(self):
444 defpath_list = os.defpath.split(os.pathsep)
445 test_path = ['/monty', '/python', '', '/flying/circus']
446 test_env = {'PATH': os.pathsep.join(test_path)}
447
448 saved_environ = os.environ
449 try:
450 os.environ = dict(test_env)
451 # Test that defaulting to os.environ works.
452 self.assertSequenceEqual(test_path, os.get_exec_path())
453 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
454 finally:
455 os.environ = saved_environ
456
457 # No PATH environment variable
458 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
459 # Empty PATH environment variable
460 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
461 # Supplied PATH environment variable
462 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
463
Victor Stinnerb745a742010-05-18 17:17:23 +0000464 if os.supports_bytes_environ:
465 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000466 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000467 # ignore BytesWarning warning
468 with warnings.catch_warnings(record=True):
469 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000470 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000471 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000472 pass
473 else:
474 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000475
476 # bytes key and/or value
477 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
478 ['abc'])
479 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
480 ['abc'])
481 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
482 ['abc'])
483
484 @unittest.skipUnless(os.supports_bytes_environ,
485 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000486 def test_environb(self):
487 # os.environ -> os.environb
488 value = 'euro\u20ac'
489 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000490 value_bytes = value.encode(sys.getfilesystemencoding(),
491 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000492 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000493 msg = "U+20AC character is not encodable to %s" % (
494 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000495 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000496 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000497 self.assertEqual(os.environ['unicode'], value)
498 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000499
500 # os.environb -> os.environ
501 value = b'\xff'
502 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000503 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000504 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000505 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000506
Charles-François Natali2966f102011-11-26 11:32:46 +0100507 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
508 # #13415).
509 @support.requires_freebsd_version(7)
510 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100511 def test_unset_error(self):
512 if sys.platform == "win32":
513 # an environment variable is limited to 32,767 characters
514 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100515 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100516 else:
517 # "=" is not allowed in a variable name
518 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100519 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100520
Tim Petersc4e09402003-04-25 07:11:48 +0000521class WalkTests(unittest.TestCase):
522 """Tests for os.walk()."""
523
Charles-François Natali7372b062012-02-05 15:15:38 +0100524 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000525 import os
526 from os.path import join
527
528 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000529 # TESTFN/
530 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000531 # tmp1
532 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000533 # tmp2
534 # SUB11/ no kids
535 # SUB2/ a file kid and a dirsymlink kid
536 # tmp3
537 # link/ a symlink to TESTFN.2
538 # TEST2/
539 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000540 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000541 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000542 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000543 sub2_path = join(walk_path, "SUB2")
544 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000545 tmp2_path = join(sub1_path, "tmp2")
546 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000547 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000548 t2_path = join(support.TESTFN, "TEST2")
549 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000550
551 # Create stuff.
552 os.makedirs(sub11_path)
553 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000554 os.makedirs(t2_path)
555 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000556 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000557 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
558 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000559 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100560 if os.name == 'nt':
561 def symlink_to_dir(src, dest):
562 os.symlink(src, dest, True)
563 else:
564 symlink_to_dir = os.symlink
565 symlink_to_dir(os.path.abspath(t2_path), link_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000566 sub2_tree = (sub2_path, ["link"], ["tmp3"])
567 else:
568 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000569
570 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000571 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000572 self.assertEqual(len(all), 4)
573 # We can't know which order SUB1 and SUB2 will appear in.
574 # Not flipped: TESTFN, SUB1, SUB11, SUB2
575 # flipped: TESTFN, SUB2, SUB1, SUB11
576 flipped = all[0][1][0] != "SUB1"
577 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000578 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000579 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
580 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000581 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000582
583 # Prune the search.
584 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000585 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000586 all.append((root, dirs, files))
587 # Don't descend into SUB1.
588 if 'SUB1' in dirs:
589 # Note that this also mutates the dirs we appended to all!
590 dirs.remove('SUB1')
591 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000592 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
593 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000594
595 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000596 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000597 self.assertEqual(len(all), 4)
598 # We can't know which order SUB1 and SUB2 will appear in.
599 # Not flipped: SUB11, SUB1, SUB2, TESTFN
600 # flipped: SUB2, SUB11, SUB1, TESTFN
601 flipped = all[3][1][0] != "SUB1"
602 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000603 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000604 self.assertEqual(all[flipped], (sub11_path, [], []))
605 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000606 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000607
Brian Curtin3b4499c2010-12-28 14:31:47 +0000608 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000609 # Walk, following symlinks.
610 for root, dirs, files in os.walk(walk_path, followlinks=True):
611 if root == link_path:
612 self.assertEqual(dirs, [])
613 self.assertEqual(files, ["tmp4"])
614 break
615 else:
616 self.fail("Didn't follow symlink with followlinks=True")
617
618 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000619 # Tear everything down. This is a decent use for bottom-up on
620 # Windows, which doesn't have a recursive delete command. The
621 # (not so) subtlety is that rmdir will fail unless the dir's
622 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000623 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000624 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000625 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000626 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000627 dirname = os.path.join(root, name)
628 if not os.path.islink(dirname):
629 os.rmdir(dirname)
630 else:
631 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000632 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000633
Charles-François Natali7372b062012-02-05 15:15:38 +0100634
635@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
636class FwalkTests(WalkTests):
637 """Tests for os.fwalk()."""
638
639 def test_compare_to_walk(self):
640 # compare with walk() results
641 for topdown, followlinks in itertools.product((True, False), repeat=2):
642 args = support.TESTFN, topdown, None, followlinks
643 expected = {}
644 for root, dirs, files in os.walk(*args):
645 expected[root] = (set(dirs), set(files))
646
647 for root, dirs, files, rootfd in os.fwalk(*args):
648 self.assertIn(root, expected)
649 self.assertEqual(expected[root], (set(dirs), set(files)))
650
651 def test_dir_fd(self):
652 # check returned file descriptors
653 for topdown, followlinks in itertools.product((True, False), repeat=2):
654 args = support.TESTFN, topdown, None, followlinks
655 for root, dirs, files, rootfd in os.fwalk(*args):
656 # check that the FD is valid
657 os.fstat(rootfd)
Charles-François Natali77940902012-02-06 19:54:48 +0100658 # check that flistdir() returns consistent information
659 self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100660
661 def test_fd_leak(self):
662 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
663 # we both check that calling fwalk() a large number of times doesn't
664 # yield EMFILE, and that the minimum allocated FD hasn't changed.
665 minfd = os.dup(1)
666 os.close(minfd)
667 for i in range(256):
668 for x in os.fwalk(support.TESTFN):
669 pass
670 newfd = os.dup(1)
671 self.addCleanup(os.close, newfd)
672 self.assertEqual(newfd, minfd)
673
674 def tearDown(self):
675 # cleanup
676 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
677 for name in files:
678 os.unlinkat(rootfd, name)
679 for name in dirs:
680 st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW)
681 if stat.S_ISDIR(st.st_mode):
682 os.unlinkat(rootfd, name, os.AT_REMOVEDIR)
683 else:
684 os.unlinkat(rootfd, name)
685 os.rmdir(support.TESTFN)
686
687
Guido van Rossume7ba4952007-06-06 23:52:48 +0000688class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000689 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000690 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000691
692 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000693 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000694 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
695 os.makedirs(path) # Should work
696 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
697 os.makedirs(path)
698
699 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000700 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000701 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
702 os.makedirs(path)
703 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
704 'dir5', 'dir6')
705 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000706
Terry Reedy5a22b652010-12-02 07:05:56 +0000707 def test_exist_ok_existing_directory(self):
708 path = os.path.join(support.TESTFN, 'dir1')
709 mode = 0o777
710 old_mask = os.umask(0o022)
711 os.makedirs(path, mode)
712 self.assertRaises(OSError, os.makedirs, path, mode)
713 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
714 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
715 os.makedirs(path, mode=mode, exist_ok=True)
716 os.umask(old_mask)
717
718 def test_exist_ok_existing_regular_file(self):
719 base = support.TESTFN
720 path = os.path.join(support.TESTFN, 'dir1')
721 f = open(path, 'w')
722 f.write('abc')
723 f.close()
724 self.assertRaises(OSError, os.makedirs, path)
725 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
726 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
727 os.remove(path)
728
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000729 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000730 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000731 'dir4', 'dir5', 'dir6')
732 # If the tests failed, the bottom-most directory ('../dir6')
733 # may not have been created, so we look for the outermost directory
734 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000735 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000736 path = os.path.dirname(path)
737
738 os.removedirs(path)
739
Guido van Rossume7ba4952007-06-06 23:52:48 +0000740class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000741 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200742 with open(os.devnull, 'wb') as f:
743 f.write(b'hello')
744 f.close()
745 with open(os.devnull, 'rb') as f:
746 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000747
Guido van Rossume7ba4952007-06-06 23:52:48 +0000748class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000749 def test_urandom(self):
750 try:
751 self.assertEqual(len(os.urandom(1)), 1)
752 self.assertEqual(len(os.urandom(10)), 10)
753 self.assertEqual(len(os.urandom(100)), 100)
754 self.assertEqual(len(os.urandom(1000)), 1000)
755 except NotImplementedError:
756 pass
757
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000758@contextlib.contextmanager
759def _execvpe_mockup(defpath=None):
760 """
761 Stubs out execv and execve functions when used as context manager.
762 Records exec calls. The mock execv and execve functions always raise an
763 exception as they would normally never return.
764 """
765 # A list of tuples containing (function name, first arg, args)
766 # of calls to execv or execve that have been made.
767 calls = []
768
769 def mock_execv(name, *args):
770 calls.append(('execv', name, args))
771 raise RuntimeError("execv called")
772
773 def mock_execve(name, *args):
774 calls.append(('execve', name, args))
775 raise OSError(errno.ENOTDIR, "execve called")
776
777 try:
778 orig_execv = os.execv
779 orig_execve = os.execve
780 orig_defpath = os.defpath
781 os.execv = mock_execv
782 os.execve = mock_execve
783 if defpath is not None:
784 os.defpath = defpath
785 yield calls
786 finally:
787 os.execv = orig_execv
788 os.execve = orig_execve
789 os.defpath = orig_defpath
790
Guido van Rossume7ba4952007-06-06 23:52:48 +0000791class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000792 @unittest.skipIf(USING_LINUXTHREADS,
793 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000794 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000795 self.assertRaises(OSError, os.execvpe, 'no such app-',
796 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000797
Thomas Heller6790d602007-08-30 17:15:14 +0000798 def test_execvpe_with_bad_arglist(self):
799 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
800
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000801 @unittest.skipUnless(hasattr(os, '_execvpe'),
802 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000803 def _test_internal_execvpe(self, test_type):
804 program_path = os.sep + 'absolutepath'
805 if test_type is bytes:
806 program = b'executable'
807 fullpath = os.path.join(os.fsencode(program_path), program)
808 native_fullpath = fullpath
809 arguments = [b'progname', 'arg1', 'arg2']
810 else:
811 program = 'executable'
812 arguments = ['progname', 'arg1', 'arg2']
813 fullpath = os.path.join(program_path, program)
814 if os.name != "nt":
815 native_fullpath = os.fsencode(fullpath)
816 else:
817 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000818 env = {'spam': 'beans'}
819
Victor Stinnerb745a742010-05-18 17:17:23 +0000820 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000821 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000822 self.assertRaises(RuntimeError,
823 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000824 self.assertEqual(len(calls), 1)
825 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
826
Victor Stinnerb745a742010-05-18 17:17:23 +0000827 # test os._execvpe() with a relative path:
828 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000829 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000830 self.assertRaises(OSError,
831 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000832 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000833 self.assertSequenceEqual(calls[0],
834 ('execve', native_fullpath, (arguments, env)))
835
836 # test os._execvpe() with a relative path:
837 # os.get_exec_path() reads the 'PATH' variable
838 with _execvpe_mockup() as calls:
839 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000840 if test_type is bytes:
841 env_path[b'PATH'] = program_path
842 else:
843 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000844 self.assertRaises(OSError,
845 os._execvpe, program, arguments, env=env_path)
846 self.assertEqual(len(calls), 1)
847 self.assertSequenceEqual(calls[0],
848 ('execve', native_fullpath, (arguments, env_path)))
849
850 def test_internal_execvpe_str(self):
851 self._test_internal_execvpe(str)
852 if os.name != "nt":
853 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000854
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000855
Thomas Wouters477c8d52006-05-27 19:21:47 +0000856class Win32ErrorTests(unittest.TestCase):
857 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000858 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000859
860 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000861 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000862
863 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000864 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000865
866 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000867 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000868 try:
869 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
870 finally:
871 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000872 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000873
874 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000875 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000876
Thomas Wouters477c8d52006-05-27 19:21:47 +0000877 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000878 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000879
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000880class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000881 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000882 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
883 #singles.append("close")
884 #We omit close because it doesn'r raise an exception on some platforms
885 def get_single(f):
886 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000887 if hasattr(os, f):
888 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000889 return helper
890 for f in singles:
891 locals()["test_"+f] = get_single(f)
892
Benjamin Peterson7522c742009-01-19 21:00:09 +0000893 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000894 try:
895 f(support.make_bad_fd(), *args)
896 except OSError as e:
897 self.assertEqual(e.errno, errno.EBADF)
898 else:
899 self.fail("%r didn't raise a OSError with a bad file descriptor"
900 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000901
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000902 def test_isatty(self):
903 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000904 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000905
906 def test_closerange(self):
907 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000908 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000909 # Make sure none of the descriptors we are about to close are
910 # currently valid (issue 6542).
911 for i in range(10):
912 try: os.fstat(fd+i)
913 except OSError:
914 pass
915 else:
916 break
917 if i < 2:
918 raise unittest.SkipTest(
919 "Unable to acquire a range of invalid file descriptors")
920 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000921
922 def test_dup2(self):
923 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000924 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000925
926 def test_fchmod(self):
927 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000928 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000929
930 def test_fchown(self):
931 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000932 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000933
934 def test_fpathconf(self):
935 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000936 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000937
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000938 def test_ftruncate(self):
939 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000940 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000941
942 def test_lseek(self):
943 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000944 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000945
946 def test_read(self):
947 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000948 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000949
950 def test_tcsetpgrpt(self):
951 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000952 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000953
954 def test_write(self):
955 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000956 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000957
Brian Curtin1b9df392010-11-24 20:24:31 +0000958
959class LinkTests(unittest.TestCase):
960 def setUp(self):
961 self.file1 = support.TESTFN
962 self.file2 = os.path.join(support.TESTFN + "2")
963
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000964 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000965 for file in (self.file1, self.file2):
966 if os.path.exists(file):
967 os.unlink(file)
968
Brian Curtin1b9df392010-11-24 20:24:31 +0000969 def _test_link(self, file1, file2):
970 with open(file1, "w") as f1:
971 f1.write("test")
972
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100973 with warnings.catch_warnings():
974 warnings.simplefilter("ignore", DeprecationWarning)
975 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +0000976 with open(file1, "r") as f1, open(file2, "r") as f2:
977 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
978
979 def test_link(self):
980 self._test_link(self.file1, self.file2)
981
982 def test_link_bytes(self):
983 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
984 bytes(self.file2, sys.getfilesystemencoding()))
985
Brian Curtinf498b752010-11-30 15:54:04 +0000986 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000987 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000988 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000989 except UnicodeError:
990 raise unittest.SkipTest("Unable to encode for this platform.")
991
Brian Curtinf498b752010-11-30 15:54:04 +0000992 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000993 self.file2 = self.file1 + "2"
994 self._test_link(self.file1, self.file2)
995
Thomas Wouters477c8d52006-05-27 19:21:47 +0000996if sys.platform != 'win32':
997 class Win32ErrorTests(unittest.TestCase):
998 pass
999
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001000 class PosixUidGidTests(unittest.TestCase):
1001 if hasattr(os, 'setuid'):
1002 def test_setuid(self):
1003 if os.getuid() != 0:
1004 self.assertRaises(os.error, os.setuid, 0)
1005 self.assertRaises(OverflowError, os.setuid, 1<<32)
1006
1007 if hasattr(os, 'setgid'):
1008 def test_setgid(self):
1009 if os.getuid() != 0:
1010 self.assertRaises(os.error, os.setgid, 0)
1011 self.assertRaises(OverflowError, os.setgid, 1<<32)
1012
1013 if hasattr(os, 'seteuid'):
1014 def test_seteuid(self):
1015 if os.getuid() != 0:
1016 self.assertRaises(os.error, os.seteuid, 0)
1017 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1018
1019 if hasattr(os, 'setegid'):
1020 def test_setegid(self):
1021 if os.getuid() != 0:
1022 self.assertRaises(os.error, os.setegid, 0)
1023 self.assertRaises(OverflowError, os.setegid, 1<<32)
1024
1025 if hasattr(os, 'setreuid'):
1026 def test_setreuid(self):
1027 if os.getuid() != 0:
1028 self.assertRaises(os.error, os.setreuid, 0, 0)
1029 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1030 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001031
1032 def test_setreuid_neg1(self):
1033 # Needs to accept -1. We run this in a subprocess to avoid
1034 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001035 subprocess.check_call([
1036 sys.executable, '-c',
1037 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001038
1039 if hasattr(os, 'setregid'):
1040 def test_setregid(self):
1041 if os.getuid() != 0:
1042 self.assertRaises(os.error, os.setregid, 0, 0)
1043 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1044 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001045
1046 def test_setregid_neg1(self):
1047 # Needs to accept -1. We run this in a subprocess to avoid
1048 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001049 subprocess.check_call([
1050 sys.executable, '-c',
1051 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001052
1053 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001054 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001055 if support.TESTFN_UNENCODABLE:
1056 self.dir = support.TESTFN_UNENCODABLE
1057 else:
1058 self.dir = support.TESTFN
1059 self.bdir = os.fsencode(self.dir)
1060
1061 bytesfn = []
1062 def add_filename(fn):
1063 try:
1064 fn = os.fsencode(fn)
1065 except UnicodeEncodeError:
1066 return
1067 bytesfn.append(fn)
1068 add_filename(support.TESTFN_UNICODE)
1069 if support.TESTFN_UNENCODABLE:
1070 add_filename(support.TESTFN_UNENCODABLE)
1071 if not bytesfn:
1072 self.skipTest("couldn't create any non-ascii filename")
1073
1074 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001075 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001076 try:
1077 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001078 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001079 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001080 if fn in self.unicodefn:
1081 raise ValueError("duplicate filename")
1082 self.unicodefn.add(fn)
1083 except:
1084 shutil.rmtree(self.dir)
1085 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001086
1087 def tearDown(self):
1088 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001089
1090 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001091 expected = self.unicodefn
1092 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001093 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001094
1095 def test_open(self):
1096 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001097 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001098 f.close()
1099
1100 def test_stat(self):
1101 for fn in self.unicodefn:
1102 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001103else:
1104 class PosixUidGidTests(unittest.TestCase):
1105 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001106 class Pep383Tests(unittest.TestCase):
1107 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001108
Brian Curtineb24d742010-04-12 17:16:38 +00001109@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1110class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001111 def _kill(self, sig):
1112 # Start sys.executable as a subprocess and communicate from the
1113 # subprocess to the parent that the interpreter is ready. When it
1114 # becomes ready, send *sig* via os.kill to the subprocess and check
1115 # that the return code is equal to *sig*.
1116 import ctypes
1117 from ctypes import wintypes
1118 import msvcrt
1119
1120 # Since we can't access the contents of the process' stdout until the
1121 # process has exited, use PeekNamedPipe to see what's inside stdout
1122 # without waiting. This is done so we can tell that the interpreter
1123 # is started and running at a point where it could handle a signal.
1124 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1125 PeekNamedPipe.restype = wintypes.BOOL
1126 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1127 ctypes.POINTER(ctypes.c_char), # stdout buf
1128 wintypes.DWORD, # Buffer size
1129 ctypes.POINTER(wintypes.DWORD), # bytes read
1130 ctypes.POINTER(wintypes.DWORD), # bytes avail
1131 ctypes.POINTER(wintypes.DWORD)) # bytes left
1132 msg = "running"
1133 proc = subprocess.Popen([sys.executable, "-c",
1134 "import sys;"
1135 "sys.stdout.write('{}');"
1136 "sys.stdout.flush();"
1137 "input()".format(msg)],
1138 stdout=subprocess.PIPE,
1139 stderr=subprocess.PIPE,
1140 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001141 self.addCleanup(proc.stdout.close)
1142 self.addCleanup(proc.stderr.close)
1143 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001144
1145 count, max = 0, 100
1146 while count < max and proc.poll() is None:
1147 # Create a string buffer to store the result of stdout from the pipe
1148 buf = ctypes.create_string_buffer(len(msg))
1149 # Obtain the text currently in proc.stdout
1150 # Bytes read/avail/left are left as NULL and unused
1151 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1152 buf, ctypes.sizeof(buf), None, None, None)
1153 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1154 if buf.value:
1155 self.assertEqual(msg, buf.value.decode())
1156 break
1157 time.sleep(0.1)
1158 count += 1
1159 else:
1160 self.fail("Did not receive communication from the subprocess")
1161
Brian Curtineb24d742010-04-12 17:16:38 +00001162 os.kill(proc.pid, sig)
1163 self.assertEqual(proc.wait(), sig)
1164
1165 def test_kill_sigterm(self):
1166 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001167 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001168
1169 def test_kill_int(self):
1170 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001171 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001172
1173 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001174 tagname = "test_os_%s" % uuid.uuid1()
1175 m = mmap.mmap(-1, 1, tagname)
1176 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001177 # Run a script which has console control handling enabled.
1178 proc = subprocess.Popen([sys.executable,
1179 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001180 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001181 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1182 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001183 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001184 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001185 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001186 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001187 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001188 count += 1
1189 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001190 # Forcefully kill the process if we weren't able to signal it.
1191 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001192 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001193 os.kill(proc.pid, event)
1194 # proc.send_signal(event) could also be done here.
1195 # Allow time for the signal to be passed and the process to exit.
1196 time.sleep(0.5)
1197 if not proc.poll():
1198 # Forcefully kill the process if we weren't able to signal it.
1199 os.kill(proc.pid, signal.SIGINT)
1200 self.fail("subprocess did not stop on {}".format(name))
1201
1202 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1203 def test_CTRL_C_EVENT(self):
1204 from ctypes import wintypes
1205 import ctypes
1206
1207 # Make a NULL value by creating a pointer with no argument.
1208 NULL = ctypes.POINTER(ctypes.c_int)()
1209 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1210 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1211 wintypes.BOOL)
1212 SetConsoleCtrlHandler.restype = wintypes.BOOL
1213
1214 # Calling this with NULL and FALSE causes the calling process to
1215 # handle CTRL+C, rather than ignore it. This property is inherited
1216 # by subprocesses.
1217 SetConsoleCtrlHandler(NULL, 0)
1218
1219 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1220
1221 def test_CTRL_BREAK_EVENT(self):
1222 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1223
1224
Brian Curtind40e6f72010-07-08 21:39:08 +00001225@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001226@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001227class Win32SymlinkTests(unittest.TestCase):
1228 filelink = 'filelinktest'
1229 filelink_target = os.path.abspath(__file__)
1230 dirlink = 'dirlinktest'
1231 dirlink_target = os.path.dirname(filelink_target)
1232 missing_link = 'missing link'
1233
1234 def setUp(self):
1235 assert os.path.exists(self.dirlink_target)
1236 assert os.path.exists(self.filelink_target)
1237 assert not os.path.exists(self.dirlink)
1238 assert not os.path.exists(self.filelink)
1239 assert not os.path.exists(self.missing_link)
1240
1241 def tearDown(self):
1242 if os.path.exists(self.filelink):
1243 os.remove(self.filelink)
1244 if os.path.exists(self.dirlink):
1245 os.rmdir(self.dirlink)
1246 if os.path.lexists(self.missing_link):
1247 os.remove(self.missing_link)
1248
1249 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001250 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001251 self.assertTrue(os.path.exists(self.dirlink))
1252 self.assertTrue(os.path.isdir(self.dirlink))
1253 self.assertTrue(os.path.islink(self.dirlink))
1254 self.check_stat(self.dirlink, self.dirlink_target)
1255
1256 def test_file_link(self):
1257 os.symlink(self.filelink_target, self.filelink)
1258 self.assertTrue(os.path.exists(self.filelink))
1259 self.assertTrue(os.path.isfile(self.filelink))
1260 self.assertTrue(os.path.islink(self.filelink))
1261 self.check_stat(self.filelink, self.filelink_target)
1262
1263 def _create_missing_dir_link(self):
1264 'Create a "directory" link to a non-existent target'
1265 linkname = self.missing_link
1266 if os.path.lexists(linkname):
1267 os.remove(linkname)
1268 target = r'c:\\target does not exist.29r3c740'
1269 assert not os.path.exists(target)
1270 target_is_dir = True
1271 os.symlink(target, linkname, target_is_dir)
1272
1273 def test_remove_directory_link_to_missing_target(self):
1274 self._create_missing_dir_link()
1275 # For compatibility with Unix, os.remove will check the
1276 # directory status and call RemoveDirectory if the symlink
1277 # was created with target_is_dir==True.
1278 os.remove(self.missing_link)
1279
1280 @unittest.skip("currently fails; consider for improvement")
1281 def test_isdir_on_directory_link_to_missing_target(self):
1282 self._create_missing_dir_link()
1283 # consider having isdir return true for directory links
1284 self.assertTrue(os.path.isdir(self.missing_link))
1285
1286 @unittest.skip("currently fails; consider for improvement")
1287 def test_rmdir_on_directory_link_to_missing_target(self):
1288 self._create_missing_dir_link()
1289 # consider allowing rmdir to remove directory links
1290 os.rmdir(self.missing_link)
1291
1292 def check_stat(self, link, target):
1293 self.assertEqual(os.stat(link), os.stat(target))
1294 self.assertNotEqual(os.lstat(link), os.stat(link))
1295
Brian Curtind25aef52011-06-13 15:16:04 -05001296 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001297 with warnings.catch_warnings():
1298 warnings.simplefilter("ignore", DeprecationWarning)
1299 self.assertEqual(os.stat(bytes_link), os.stat(target))
1300 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001301
1302 def test_12084(self):
1303 level1 = os.path.abspath(support.TESTFN)
1304 level2 = os.path.join(level1, "level2")
1305 level3 = os.path.join(level2, "level3")
1306 try:
1307 os.mkdir(level1)
1308 os.mkdir(level2)
1309 os.mkdir(level3)
1310
1311 file1 = os.path.abspath(os.path.join(level1, "file1"))
1312
1313 with open(file1, "w") as f:
1314 f.write("file1")
1315
1316 orig_dir = os.getcwd()
1317 try:
1318 os.chdir(level2)
1319 link = os.path.join(level2, "link")
1320 os.symlink(os.path.relpath(file1), "link")
1321 self.assertIn("link", os.listdir(os.getcwd()))
1322
1323 # Check os.stat calls from the same dir as the link
1324 self.assertEqual(os.stat(file1), os.stat("link"))
1325
1326 # Check os.stat calls from a dir below the link
1327 os.chdir(level1)
1328 self.assertEqual(os.stat(file1),
1329 os.stat(os.path.relpath(link)))
1330
1331 # Check os.stat calls from a dir above the link
1332 os.chdir(level3)
1333 self.assertEqual(os.stat(file1),
1334 os.stat(os.path.relpath(link)))
1335 finally:
1336 os.chdir(orig_dir)
1337 except OSError as err:
1338 self.fail(err)
1339 finally:
1340 os.remove(file1)
1341 shutil.rmtree(level1)
1342
Brian Curtind40e6f72010-07-08 21:39:08 +00001343
Victor Stinnere8d51452010-08-19 01:05:19 +00001344class FSEncodingTests(unittest.TestCase):
1345 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001346 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1347 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001348
Victor Stinnere8d51452010-08-19 01:05:19 +00001349 def test_identity(self):
1350 # assert fsdecode(fsencode(x)) == x
1351 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1352 try:
1353 bytesfn = os.fsencode(fn)
1354 except UnicodeEncodeError:
1355 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001356 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001357
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001358
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001359class PidTests(unittest.TestCase):
1360 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1361 def test_getppid(self):
1362 p = subprocess.Popen([sys.executable, '-c',
1363 'import os; print(os.getppid())'],
1364 stdout=subprocess.PIPE)
1365 stdout, _ = p.communicate()
1366 # We are the parent of our subprocess
1367 self.assertEqual(int(stdout), os.getpid())
1368
1369
Brian Curtin0151b8e2010-09-24 13:43:43 +00001370# The introduction of this TestCase caused at least two different errors on
1371# *nix buildbots. Temporarily skip this to let the buildbots move along.
1372@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001373@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1374class LoginTests(unittest.TestCase):
1375 def test_getlogin(self):
1376 user_name = os.getlogin()
1377 self.assertNotEqual(len(user_name), 0)
1378
1379
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001380@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1381 "needs os.getpriority and os.setpriority")
1382class ProgramPriorityTests(unittest.TestCase):
1383 """Tests for os.getpriority() and os.setpriority()."""
1384
1385 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001386
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001387 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1388 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1389 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001390 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1391 if base >= 19 and new_prio <= 19:
1392 raise unittest.SkipTest(
1393 "unable to reliably test setpriority at current nice level of %s" % base)
1394 else:
1395 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001396 finally:
1397 try:
1398 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1399 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001400 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001401 raise
1402
1403
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001404if threading is not None:
1405 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001406
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001407 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001408
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001409 def __init__(self, conn):
1410 asynchat.async_chat.__init__(self, conn)
1411 self.in_buffer = []
1412 self.closed = False
1413 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001414
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001415 def handle_read(self):
1416 data = self.recv(4096)
1417 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001418
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001419 def get_data(self):
1420 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001421
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001422 def handle_close(self):
1423 self.close()
1424 self.closed = True
1425
1426 def handle_error(self):
1427 raise
1428
1429 def __init__(self, address):
1430 threading.Thread.__init__(self)
1431 asyncore.dispatcher.__init__(self)
1432 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1433 self.bind(address)
1434 self.listen(5)
1435 self.host, self.port = self.socket.getsockname()[:2]
1436 self.handler_instance = None
1437 self._active = False
1438 self._active_lock = threading.Lock()
1439
1440 # --- public API
1441
1442 @property
1443 def running(self):
1444 return self._active
1445
1446 def start(self):
1447 assert not self.running
1448 self.__flag = threading.Event()
1449 threading.Thread.start(self)
1450 self.__flag.wait()
1451
1452 def stop(self):
1453 assert self.running
1454 self._active = False
1455 self.join()
1456
1457 def wait(self):
1458 # wait for handler connection to be closed, then stop the server
1459 while not getattr(self.handler_instance, "closed", False):
1460 time.sleep(0.001)
1461 self.stop()
1462
1463 # --- internals
1464
1465 def run(self):
1466 self._active = True
1467 self.__flag.set()
1468 while self._active and asyncore.socket_map:
1469 self._active_lock.acquire()
1470 asyncore.loop(timeout=0.001, count=1)
1471 self._active_lock.release()
1472 asyncore.close_all()
1473
1474 def handle_accept(self):
1475 conn, addr = self.accept()
1476 self.handler_instance = self.Handler(conn)
1477
1478 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001479 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001480 handle_read = handle_connect
1481
1482 def writable(self):
1483 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001484
1485 def handle_error(self):
1486 raise
1487
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001488
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001489@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001490@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1491class TestSendfile(unittest.TestCase):
1492
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001493 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001494 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001495 not sys.platform.startswith("solaris") and \
1496 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001497
1498 @classmethod
1499 def setUpClass(cls):
1500 with open(support.TESTFN, "wb") as f:
1501 f.write(cls.DATA)
1502
1503 @classmethod
1504 def tearDownClass(cls):
1505 support.unlink(support.TESTFN)
1506
1507 def setUp(self):
1508 self.server = SendfileTestServer((support.HOST, 0))
1509 self.server.start()
1510 self.client = socket.socket()
1511 self.client.connect((self.server.host, self.server.port))
1512 self.client.settimeout(1)
1513 # synchronize by waiting for "220 ready" response
1514 self.client.recv(1024)
1515 self.sockno = self.client.fileno()
1516 self.file = open(support.TESTFN, 'rb')
1517 self.fileno = self.file.fileno()
1518
1519 def tearDown(self):
1520 self.file.close()
1521 self.client.close()
1522 if self.server.running:
1523 self.server.stop()
1524
1525 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1526 """A higher level wrapper representing how an application is
1527 supposed to use sendfile().
1528 """
1529 while 1:
1530 try:
1531 if self.SUPPORT_HEADERS_TRAILERS:
1532 return os.sendfile(sock, file, offset, nbytes, headers,
1533 trailers)
1534 else:
1535 return os.sendfile(sock, file, offset, nbytes)
1536 except OSError as err:
1537 if err.errno == errno.ECONNRESET:
1538 # disconnected
1539 raise
1540 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1541 # we have to retry send data
1542 continue
1543 else:
1544 raise
1545
1546 def test_send_whole_file(self):
1547 # normal send
1548 total_sent = 0
1549 offset = 0
1550 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001551 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001552 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1553 if sent == 0:
1554 break
1555 offset += sent
1556 total_sent += sent
1557 self.assertTrue(sent <= nbytes)
1558 self.assertEqual(offset, total_sent)
1559
1560 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001561 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001562 self.client.close()
1563 self.server.wait()
1564 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001565 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001566 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001567
1568 def test_send_at_certain_offset(self):
1569 # start sending a file at a certain offset
1570 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001571 offset = len(self.DATA) // 2
1572 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001573 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001574 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001575 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1576 if sent == 0:
1577 break
1578 offset += sent
1579 total_sent += sent
1580 self.assertTrue(sent <= nbytes)
1581
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001582 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001583 self.client.close()
1584 self.server.wait()
1585 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001586 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001587 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001588 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001589 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001590
1591 def test_offset_overflow(self):
1592 # specify an offset > file size
1593 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001594 try:
1595 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1596 except OSError as e:
1597 # Solaris can raise EINVAL if offset >= file length, ignore.
1598 if e.errno != errno.EINVAL:
1599 raise
1600 else:
1601 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001602 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001603 self.client.close()
1604 self.server.wait()
1605 data = self.server.handler_instance.get_data()
1606 self.assertEqual(data, b'')
1607
1608 def test_invalid_offset(self):
1609 with self.assertRaises(OSError) as cm:
1610 os.sendfile(self.sockno, self.fileno, -1, 4096)
1611 self.assertEqual(cm.exception.errno, errno.EINVAL)
1612
1613 # --- headers / trailers tests
1614
1615 if SUPPORT_HEADERS_TRAILERS:
1616
1617 def test_headers(self):
1618 total_sent = 0
1619 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1620 headers=[b"x" * 512])
1621 total_sent += sent
1622 offset = 4096
1623 nbytes = 4096
1624 while 1:
1625 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1626 offset, nbytes)
1627 if sent == 0:
1628 break
1629 total_sent += sent
1630 offset += sent
1631
1632 expected_data = b"x" * 512 + self.DATA
1633 self.assertEqual(total_sent, len(expected_data))
1634 self.client.close()
1635 self.server.wait()
1636 data = self.server.handler_instance.get_data()
1637 self.assertEqual(hash(data), hash(expected_data))
1638
1639 def test_trailers(self):
1640 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001641 with open(TESTFN2, 'wb') as f:
1642 f.write(b"abcde")
1643 with open(TESTFN2, 'rb')as f:
1644 self.addCleanup(os.remove, TESTFN2)
1645 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1646 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001647 self.client.close()
1648 self.server.wait()
1649 data = self.server.handler_instance.get_data()
1650 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001651
1652 if hasattr(os, "SF_NODISKIO"):
1653 def test_flags(self):
1654 try:
1655 os.sendfile(self.sockno, self.fileno, 0, 4096,
1656 flags=os.SF_NODISKIO)
1657 except OSError as err:
1658 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1659 raise
1660
1661
Benjamin Peterson799bd802011-08-31 22:15:17 -04001662def supports_extended_attributes():
1663 if not hasattr(os, "setxattr"):
1664 return False
1665 try:
1666 with open(support.TESTFN, "wb") as fp:
1667 try:
1668 os.fsetxattr(fp.fileno(), b"user.test", b"")
1669 except OSError as e:
1670 if e.errno != errno.ENOTSUP:
1671 raise
1672 return False
1673 finally:
1674 support.unlink(support.TESTFN)
1675 # Kernels < 2.6.39 don't respect setxattr flags.
1676 kernel_version = platform.release()
1677 m = re.match("2.6.(\d{1,2})", kernel_version)
1678 return m is None or int(m.group(1)) >= 39
1679
1680
1681@unittest.skipUnless(supports_extended_attributes(),
1682 "no non-broken extended attribute support")
1683class ExtendedAttributeTests(unittest.TestCase):
1684
1685 def tearDown(self):
1686 support.unlink(support.TESTFN)
1687
1688 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1689 fn = support.TESTFN
1690 open(fn, "wb").close()
1691 with self.assertRaises(OSError) as cm:
1692 getxattr(fn, s("user.test"))
1693 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001694 init_xattr = listxattr(fn)
1695 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001696 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001697 xattr = set(init_xattr)
1698 xattr.add("user.test")
1699 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001700 self.assertEqual(getxattr(fn, b"user.test"), b"")
1701 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1702 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1703 with self.assertRaises(OSError) as cm:
1704 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1705 self.assertEqual(cm.exception.errno, errno.EEXIST)
1706 with self.assertRaises(OSError) as cm:
1707 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1708 self.assertEqual(cm.exception.errno, errno.ENODATA)
1709 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001710 xattr.add("user.test2")
1711 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001712 removexattr(fn, s("user.test"))
1713 with self.assertRaises(OSError) as cm:
1714 getxattr(fn, s("user.test"))
1715 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001716 xattr.remove("user.test")
1717 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001718 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1719 setxattr(fn, s("user.test"), b"a"*1024)
1720 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1721 removexattr(fn, s("user.test"))
1722 many = sorted("user.test{}".format(i) for i in range(100))
1723 for thing in many:
1724 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001725 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001726
1727 def _check_xattrs(self, *args):
1728 def make_bytes(s):
1729 return bytes(s, "ascii")
1730 self._check_xattrs_str(str, *args)
1731 support.unlink(support.TESTFN)
1732 self._check_xattrs_str(make_bytes, *args)
1733
1734 def test_simple(self):
1735 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1736 os.listxattr)
1737
1738 def test_lpath(self):
1739 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1740 os.llistxattr)
1741
1742 def test_fds(self):
1743 def getxattr(path, *args):
1744 with open(path, "rb") as fp:
1745 return os.fgetxattr(fp.fileno(), *args)
1746 def setxattr(path, *args):
1747 with open(path, "wb") as fp:
1748 os.fsetxattr(fp.fileno(), *args)
1749 def removexattr(path, *args):
1750 with open(path, "wb") as fp:
1751 os.fremovexattr(fp.fileno(), *args)
1752 def listxattr(path, *args):
1753 with open(path, "rb") as fp:
1754 return os.flistxattr(fp.fileno(), *args)
1755 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1756
1757
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001758@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1759class Win32DeprecatedBytesAPI(unittest.TestCase):
1760 def test_deprecated(self):
1761 import nt
1762 filename = os.fsencode(support.TESTFN)
1763 with warnings.catch_warnings():
1764 warnings.simplefilter("error", DeprecationWarning)
1765 for func, *args in (
1766 (nt._getfullpathname, filename),
1767 (nt._isdir, filename),
1768 (os.access, filename, os.R_OK),
1769 (os.chdir, filename),
1770 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001771 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001772 (os.link, filename, filename),
1773 (os.listdir, filename),
1774 (os.lstat, filename),
1775 (os.mkdir, filename),
1776 (os.open, filename, os.O_RDONLY),
1777 (os.rename, filename, filename),
1778 (os.rmdir, filename),
1779 (os.startfile, filename),
1780 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001781 (os.unlink, filename),
1782 (os.utime, filename),
1783 ):
1784 self.assertRaises(DeprecationWarning, func, *args)
1785
Victor Stinner28216442011-11-16 00:34:44 +01001786 @support.skip_unless_symlink
1787 def test_symlink(self):
1788 filename = os.fsencode(support.TESTFN)
1789 with warnings.catch_warnings():
1790 warnings.simplefilter("error", DeprecationWarning)
1791 self.assertRaises(DeprecationWarning,
1792 os.symlink, filename, filename)
1793
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001794
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001795@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001796def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001797 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001798 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001799 StatAttributeTests,
1800 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001801 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01001802 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001803 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001804 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001805 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001806 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001807 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001808 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001809 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001810 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001811 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001812 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001813 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001814 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001815 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001816 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001817 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001818 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001819 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001820 Win32DeprecatedBytesAPI,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001821 )
Fred Drake2e2be372001-09-20 21:33:42 +00001822
1823if __name__ == "__main__":
1824 test_main()