blob: 13dc337e3e5b8a5dc1c6cae85d07251f098ef2c4 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
17import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000018import asyncore
19import asynchat
20import socket
21try:
22 import threading
23except ImportError:
24 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000025
Mark Dickinson7cf03892010-04-16 13:45:35 +000026# Detect whether we're on a Linux system that uses the (now outdated
27# and unmaintained) linuxthreads threading library. There's an issue
28# when combining linuxthreads with a failed execv call: see
29# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020030if hasattr(sys, 'thread_info') and sys.thread_info.version:
31 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
32else:
33 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000034
Thomas Wouters0e3f5912006-08-11 14:57:12 +000035# Tests creating TESTFN
36class FileTests(unittest.TestCase):
37 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038 if os.path.exists(support.TESTFN):
39 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000040 tearDown = setUp
41
42 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000043 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000044 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000045 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046
Christian Heimesfdab48e2008-01-20 09:06:41 +000047 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000048 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
49 # We must allocate two consecutive file descriptors, otherwise
50 # it will mess up other file descriptors (perhaps even the three
51 # standard ones).
52 second = os.dup(first)
53 try:
54 retries = 0
55 while second != first + 1:
56 os.close(first)
57 retries += 1
58 if retries > 10:
59 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000060 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000061 first, second = second, os.dup(second)
62 finally:
63 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000064 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000065 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000066 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000068 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000069 def test_rename(self):
70 path = support.TESTFN
71 old = sys.getrefcount(path)
72 self.assertRaises(TypeError, os.rename, path, 0)
73 new = sys.getrefcount(path)
74 self.assertEqual(old, new)
75
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000076 def test_read(self):
77 with open(support.TESTFN, "w+b") as fobj:
78 fobj.write(b"spam")
79 fobj.flush()
80 fd = fobj.fileno()
81 os.lseek(fd, 0, 0)
82 s = os.read(fd, 4)
83 self.assertEqual(type(s), bytes)
84 self.assertEqual(s, b"spam")
85
86 def test_write(self):
87 # os.write() accepts bytes- and buffer-like objects but not strings
88 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
89 self.assertRaises(TypeError, os.write, fd, "beans")
90 os.write(fd, b"bacon\n")
91 os.write(fd, bytearray(b"eggs\n"))
92 os.write(fd, memoryview(b"spam\n"))
93 os.close(fd)
94 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000095 self.assertEqual(fobj.read().splitlines(),
96 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000097
Victor Stinnere0daff12011-03-20 23:36:35 +010098 def write_windows_console(self, *args):
99 retcode = subprocess.call(args,
100 # use a new console to not flood the test output
101 creationflags=subprocess.CREATE_NEW_CONSOLE,
102 # use a shell to hide the console window (SW_HIDE)
103 shell=True)
104 self.assertEqual(retcode, 0)
105
106 @unittest.skipUnless(sys.platform == 'win32',
107 'test specific to the Windows console')
108 def test_write_windows_console(self):
109 # Issue #11395: the Windows console returns an error (12: not enough
110 # space error) on writing into stdout if stdout mode is binary and the
111 # length is greater than 66,000 bytes (or less, depending on heap
112 # usage).
113 code = "print('x' * 100000)"
114 self.write_windows_console(sys.executable, "-c", code)
115 self.write_windows_console(sys.executable, "-u", "-c", code)
116
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000117
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000118class TemporaryFileTests(unittest.TestCase):
119 def setUp(self):
120 self.files = []
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000121 os.mkdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000122
123 def tearDown(self):
124 for name in self.files:
125 os.unlink(name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000126 os.rmdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000127
128 def check_tempfile(self, name):
129 # make sure it doesn't already exist:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000130 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000131 "file already exists for temporary file")
132 # make sure we can create the file
133 open(name, "w")
134 self.files.append(name)
135
136 def test_tempnam(self):
137 if not hasattr(os, "tempnam"):
138 return
139 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
140 r"test_os$")
141 self.check_tempfile(os.tempnam())
142
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000143 name = os.tempnam(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000144 self.check_tempfile(name)
145
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000146 name = os.tempnam(support.TESTFN, "pfx")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000147 self.assertTrue(os.path.basename(name)[:3] == "pfx")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000148 self.check_tempfile(name)
149
150 def test_tmpfile(self):
151 if not hasattr(os, "tmpfile"):
152 return
153 # As with test_tmpnam() below, the Windows implementation of tmpfile()
154 # attempts to create a file in the root directory of the current drive.
155 # On Vista and Server 2008, this test will always fail for normal users
156 # as writing to the root directory requires elevated privileges. With
157 # XP and below, the semantics of tmpfile() are the same, but the user
158 # running the test is more likely to have administrative privileges on
159 # their account already. If that's the case, then os.tmpfile() should
160 # work. In order to make this test as useful as possible, rather than
161 # trying to detect Windows versions or whether or not the user has the
162 # right permissions, just try and create a file in the root directory
163 # and see if it raises a 'Permission denied' OSError. If it does, then
164 # test that a subsequent call to os.tmpfile() raises the same error. If
165 # it doesn't, assume we're on XP or below and the user running the test
166 # has administrative privileges, and proceed with the test as normal.
167 if sys.platform == 'win32':
168 name = '\\python_test_os_test_tmpfile.txt'
169 if os.path.exists(name):
170 os.remove(name)
171 try:
172 fp = open(name, 'w')
173 except IOError as first:
174 # open() failed, assert tmpfile() fails in the same way.
175 # Although open() raises an IOError and os.tmpfile() raises an
176 # OSError(), 'args' will be (13, 'Permission denied') in both
177 # cases.
178 try:
179 fp = os.tmpfile()
180 except OSError as second:
181 self.assertEqual(first.args, second.args)
182 else:
183 self.fail("expected os.tmpfile() to raise OSError")
184 return
185 else:
186 # open() worked, therefore, tmpfile() should work. Close our
187 # dummy file and proceed with the test as normal.
188 fp.close()
189 os.remove(name)
190
191 fp = os.tmpfile()
192 fp.write("foobar")
193 fp.seek(0,0)
194 s = fp.read()
195 fp.close()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000196 self.assertTrue(s == "foobar")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000197
198 def test_tmpnam(self):
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000199 if not hasattr(os, "tmpnam"):
200 return
201 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
202 r"test_os$")
203 name = os.tmpnam()
204 if sys.platform in ("win32",):
205 # The Windows tmpnam() seems useless. From the MS docs:
206 #
207 # The character string that tmpnam creates consists of
208 # the path prefix, defined by the entry P_tmpdir in the
209 # file STDIO.H, followed by a sequence consisting of the
210 # digit characters '0' through '9'; the numerical value
211 # of this string is in the range 1 - 65,535. Changing the
212 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not
213 # change the operation of tmpnam.
214 #
215 # The really bizarre part is that, at least under MSVC6,
216 # P_tmpdir is "\\". That is, the path returned refers to
217 # the root of the current drive. That's a terrible place to
218 # put temp files, and, depending on privileges, the user
219 # may not even be able to open a file in the root directory.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000220 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000221 "file already exists for temporary file")
222 else:
223 self.check_tempfile(name)
224
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000225 def fdopen_helper(self, *args):
226 fd = os.open(support.TESTFN, os.O_RDONLY)
227 fp2 = os.fdopen(fd, *args)
228 fp2.close()
229
230 def test_fdopen(self):
231 self.fdopen_helper()
232 self.fdopen_helper('r')
233 self.fdopen_helper('r', 100)
234
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235# Test attributes on return values from os.*stat* family.
236class StatAttributeTests(unittest.TestCase):
237 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000238 os.mkdir(support.TESTFN)
239 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000240 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000241 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000243
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244 def tearDown(self):
245 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000246 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247
Antoine Pitrou38425292010-09-21 18:19:07 +0000248 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 if not hasattr(os, "stat"):
250 return
251
252 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000253 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254
255 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(result[stat.ST_SIZE], 3)
257 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259 # Make sure all the attributes are there
260 members = dir(result)
261 for name in dir(stat):
262 if name[:3] == 'ST_':
263 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000264 if name.endswith("TIME"):
265 def trunc(x): return int(x)
266 else:
267 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000268 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000270 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271
272 try:
273 result[200]
274 self.fail("No exception thrown")
275 except IndexError:
276 pass
277
278 # Make sure that assignment fails
279 try:
280 result.st_mode = 1
281 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000282 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000283 pass
284
285 try:
286 result.st_rdev = 1
287 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000288 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000289 pass
290
291 try:
292 result.parrot = 1
293 self.fail("No exception thrown")
294 except AttributeError:
295 pass
296
297 # Use the stat_result constructor with a too-short tuple.
298 try:
299 result2 = os.stat_result((10,))
300 self.fail("No exception thrown")
301 except TypeError:
302 pass
303
Ezio Melotti42da6632011-03-15 05:18:48 +0200304 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000305 try:
306 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
307 except TypeError:
308 pass
309
Antoine Pitrou38425292010-09-21 18:19:07 +0000310 def test_stat_attributes(self):
311 self.check_stat_attributes(self.fname)
312
313 def test_stat_attributes_bytes(self):
314 try:
315 fname = self.fname.encode(sys.getfilesystemencoding())
316 except UnicodeEncodeError:
317 self.skipTest("cannot encode %a for the filesystem" % self.fname)
318 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000319
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000320 def test_statvfs_attributes(self):
321 if not hasattr(os, "statvfs"):
322 return
323
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000324 try:
325 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000326 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000327 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000328 if e.errno == errno.ENOSYS:
329 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000330
331 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000334 # Make sure all the attributes are there.
335 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
336 'ffree', 'favail', 'flag', 'namemax')
337 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339
340 # Make sure that assignment really fails
341 try:
342 result.f_bfree = 1
343 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000344 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345 pass
346
347 try:
348 result.parrot = 1
349 self.fail("No exception thrown")
350 except AttributeError:
351 pass
352
353 # Use the constructor with a too-short tuple.
354 try:
355 result2 = os.statvfs_result((10,))
356 self.fail("No exception thrown")
357 except TypeError:
358 pass
359
Ezio Melotti42da6632011-03-15 05:18:48 +0200360 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361 try:
362 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
363 except TypeError:
364 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000365
Thomas Wouters89f507f2006-12-13 04:49:30 +0000366 def test_utime_dir(self):
367 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000368 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000369 # round to int, because some systems may support sub-second
370 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000371 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
372 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000373 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000374
375 # Restrict test to Win32, since there is no guarantee other
376 # systems support centiseconds
377 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000378 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000379 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000380 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000381 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000382 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000383 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000384 return buf.value
385
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000386 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000387 def test_1565150(self):
388 t1 = 1159195039.25
389 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000390 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000391
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000392 def test_large_time(self):
393 t1 = 5000000000 # some day in 2128
394 os.utime(self.fname, (t1, t1))
395 self.assertEqual(os.stat(self.fname).st_mtime, t1)
396
Guido van Rossumd8faa362007-04-27 19:54:29 +0000397 def test_1686475(self):
398 # Verify that an open file can be stat'ed
399 try:
400 os.stat(r"c:\pagefile.sys")
401 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000402 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000403 return
404 self.fail("Could not stat pagefile.sys")
405
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000406from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000407
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000408class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000409 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000410 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000411
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000412 def setUp(self):
413 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000414 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000415 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000416 for key, value in self._reference().items():
417 os.environ[key] = value
418
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000419 def tearDown(self):
420 os.environ.clear()
421 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000422 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000423 os.environb.clear()
424 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000425
Christian Heimes90333392007-11-01 19:08:42 +0000426 def _reference(self):
427 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
428
429 def _empty_mapping(self):
430 os.environ.clear()
431 return os.environ
432
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000433 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000434 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000435 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000436 if os.path.exists("/bin/sh"):
437 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000438 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
439 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000440 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000441
Christian Heimes1a13d592007-11-08 14:16:55 +0000442 def test_os_popen_iter(self):
443 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000444 with os.popen(
445 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
446 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000447 self.assertEqual(next(it), "line1\n")
448 self.assertEqual(next(it), "line2\n")
449 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000450 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000451
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000452 # Verify environ keys and values from the OS are of the
453 # correct str type.
454 def test_keyvalue_types(self):
455 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000456 self.assertEqual(type(key), str)
457 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000458
Christian Heimes90333392007-11-01 19:08:42 +0000459 def test_items(self):
460 for key, value in self._reference().items():
461 self.assertEqual(os.environ.get(key), value)
462
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000463 # Issue 7310
464 def test___repr__(self):
465 """Check that the repr() of os.environ looks like environ({...})."""
466 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000467 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
468 '{!r}: {!r}'.format(key, value)
469 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000470
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000471 def test_get_exec_path(self):
472 defpath_list = os.defpath.split(os.pathsep)
473 test_path = ['/monty', '/python', '', '/flying/circus']
474 test_env = {'PATH': os.pathsep.join(test_path)}
475
476 saved_environ = os.environ
477 try:
478 os.environ = dict(test_env)
479 # Test that defaulting to os.environ works.
480 self.assertSequenceEqual(test_path, os.get_exec_path())
481 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
482 finally:
483 os.environ = saved_environ
484
485 # No PATH environment variable
486 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
487 # Empty PATH environment variable
488 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
489 # Supplied PATH environment variable
490 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
491
Victor Stinnerb745a742010-05-18 17:17:23 +0000492 if os.supports_bytes_environ:
493 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000494 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000495 # ignore BytesWarning warning
496 with warnings.catch_warnings(record=True):
497 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000498 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000499 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000500 pass
501 else:
502 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000503
504 # bytes key and/or value
505 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
506 ['abc'])
507 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
508 ['abc'])
509 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
510 ['abc'])
511
512 @unittest.skipUnless(os.supports_bytes_environ,
513 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000514 def test_environb(self):
515 # os.environ -> os.environb
516 value = 'euro\u20ac'
517 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000518 value_bytes = value.encode(sys.getfilesystemencoding(),
519 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000520 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000521 msg = "U+20AC character is not encodable to %s" % (
522 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000523 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000524 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000525 self.assertEqual(os.environ['unicode'], value)
526 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000527
528 # os.environb -> os.environ
529 value = b'\xff'
530 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000531 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000532 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000533 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000534
Tim Petersc4e09402003-04-25 07:11:48 +0000535class WalkTests(unittest.TestCase):
536 """Tests for os.walk()."""
537
538 def test_traversal(self):
539 import os
540 from os.path import join
541
542 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000543 # TESTFN/
544 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000545 # tmp1
546 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000547 # tmp2
548 # SUB11/ no kids
549 # SUB2/ a file kid and a dirsymlink kid
550 # tmp3
551 # link/ a symlink to TESTFN.2
552 # TEST2/
553 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000554 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000555 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000556 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000557 sub2_path = join(walk_path, "SUB2")
558 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000559 tmp2_path = join(sub1_path, "tmp2")
560 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000561 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000562 t2_path = join(support.TESTFN, "TEST2")
563 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000564
565 # Create stuff.
566 os.makedirs(sub11_path)
567 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000568 os.makedirs(t2_path)
569 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000570 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000571 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
572 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000573 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000574 os.symlink(os.path.abspath(t2_path), link_path)
575 sub2_tree = (sub2_path, ["link"], ["tmp3"])
576 else:
577 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000578
579 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000580 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000581 self.assertEqual(len(all), 4)
582 # We can't know which order SUB1 and SUB2 will appear in.
583 # Not flipped: TESTFN, SUB1, SUB11, SUB2
584 # flipped: TESTFN, SUB2, SUB1, SUB11
585 flipped = all[0][1][0] != "SUB1"
586 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000587 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000588 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
589 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000590 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000591
592 # Prune the search.
593 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000594 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000595 all.append((root, dirs, files))
596 # Don't descend into SUB1.
597 if 'SUB1' in dirs:
598 # Note that this also mutates the dirs we appended to all!
599 dirs.remove('SUB1')
600 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000601 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
602 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000603
604 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000605 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000606 self.assertEqual(len(all), 4)
607 # We can't know which order SUB1 and SUB2 will appear in.
608 # Not flipped: SUB11, SUB1, SUB2, TESTFN
609 # flipped: SUB2, SUB11, SUB1, TESTFN
610 flipped = all[3][1][0] != "SUB1"
611 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000612 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000613 self.assertEqual(all[flipped], (sub11_path, [], []))
614 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000615 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000616
Brian Curtin3b4499c2010-12-28 14:31:47 +0000617 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000618 # Walk, following symlinks.
619 for root, dirs, files in os.walk(walk_path, followlinks=True):
620 if root == link_path:
621 self.assertEqual(dirs, [])
622 self.assertEqual(files, ["tmp4"])
623 break
624 else:
625 self.fail("Didn't follow symlink with followlinks=True")
626
627 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000628 # Tear everything down. This is a decent use for bottom-up on
629 # Windows, which doesn't have a recursive delete command. The
630 # (not so) subtlety is that rmdir will fail unless the dir's
631 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000632 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000633 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000634 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000635 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000636 dirname = os.path.join(root, name)
637 if not os.path.islink(dirname):
638 os.rmdir(dirname)
639 else:
640 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000641 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000642
Guido van Rossume7ba4952007-06-06 23:52:48 +0000643class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000644 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000645 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000646
647 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000648 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000649 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
650 os.makedirs(path) # Should work
651 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
652 os.makedirs(path)
653
654 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000655 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000656 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
657 os.makedirs(path)
658 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
659 'dir5', 'dir6')
660 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000661
Terry Reedy5a22b652010-12-02 07:05:56 +0000662 def test_exist_ok_existing_directory(self):
663 path = os.path.join(support.TESTFN, 'dir1')
664 mode = 0o777
665 old_mask = os.umask(0o022)
666 os.makedirs(path, mode)
667 self.assertRaises(OSError, os.makedirs, path, mode)
668 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
669 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
670 os.makedirs(path, mode=mode, exist_ok=True)
671 os.umask(old_mask)
672
673 def test_exist_ok_existing_regular_file(self):
674 base = support.TESTFN
675 path = os.path.join(support.TESTFN, 'dir1')
676 f = open(path, 'w')
677 f.write('abc')
678 f.close()
679 self.assertRaises(OSError, os.makedirs, path)
680 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
681 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
682 os.remove(path)
683
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000684 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000685 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000686 'dir4', 'dir5', 'dir6')
687 # If the tests failed, the bottom-most directory ('../dir6')
688 # may not have been created, so we look for the outermost directory
689 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000690 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000691 path = os.path.dirname(path)
692
693 os.removedirs(path)
694
Guido van Rossume7ba4952007-06-06 23:52:48 +0000695class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000696 def test_devnull(self):
Alex Martelli01c77c62006-08-24 02:58:11 +0000697 f = open(os.devnull, 'w')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000698 f.write('hello')
699 f.close()
Alex Martelli01c77c62006-08-24 02:58:11 +0000700 f = open(os.devnull, 'r')
Tim Peters4182cfd2004-06-08 20:34:34 +0000701 self.assertEqual(f.read(), '')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000702 f.close()
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000703
Guido van Rossume7ba4952007-06-06 23:52:48 +0000704class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000705 def test_urandom(self):
706 try:
707 self.assertEqual(len(os.urandom(1)), 1)
708 self.assertEqual(len(os.urandom(10)), 10)
709 self.assertEqual(len(os.urandom(100)), 100)
710 self.assertEqual(len(os.urandom(1000)), 1000)
711 except NotImplementedError:
712 pass
713
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000714@contextlib.contextmanager
715def _execvpe_mockup(defpath=None):
716 """
717 Stubs out execv and execve functions when used as context manager.
718 Records exec calls. The mock execv and execve functions always raise an
719 exception as they would normally never return.
720 """
721 # A list of tuples containing (function name, first arg, args)
722 # of calls to execv or execve that have been made.
723 calls = []
724
725 def mock_execv(name, *args):
726 calls.append(('execv', name, args))
727 raise RuntimeError("execv called")
728
729 def mock_execve(name, *args):
730 calls.append(('execve', name, args))
731 raise OSError(errno.ENOTDIR, "execve called")
732
733 try:
734 orig_execv = os.execv
735 orig_execve = os.execve
736 orig_defpath = os.defpath
737 os.execv = mock_execv
738 os.execve = mock_execve
739 if defpath is not None:
740 os.defpath = defpath
741 yield calls
742 finally:
743 os.execv = orig_execv
744 os.execve = orig_execve
745 os.defpath = orig_defpath
746
Guido van Rossume7ba4952007-06-06 23:52:48 +0000747class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000748 @unittest.skipIf(USING_LINUXTHREADS,
749 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000750 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000751 self.assertRaises(OSError, os.execvpe, 'no such app-',
752 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000753
Thomas Heller6790d602007-08-30 17:15:14 +0000754 def test_execvpe_with_bad_arglist(self):
755 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
756
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000757 @unittest.skipUnless(hasattr(os, '_execvpe'),
758 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000759 def _test_internal_execvpe(self, test_type):
760 program_path = os.sep + 'absolutepath'
761 if test_type is bytes:
762 program = b'executable'
763 fullpath = os.path.join(os.fsencode(program_path), program)
764 native_fullpath = fullpath
765 arguments = [b'progname', 'arg1', 'arg2']
766 else:
767 program = 'executable'
768 arguments = ['progname', 'arg1', 'arg2']
769 fullpath = os.path.join(program_path, program)
770 if os.name != "nt":
771 native_fullpath = os.fsencode(fullpath)
772 else:
773 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000774 env = {'spam': 'beans'}
775
Victor Stinnerb745a742010-05-18 17:17:23 +0000776 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000777 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000778 self.assertRaises(RuntimeError,
779 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000780 self.assertEqual(len(calls), 1)
781 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
782
Victor Stinnerb745a742010-05-18 17:17:23 +0000783 # test os._execvpe() with a relative path:
784 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000785 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000786 self.assertRaises(OSError,
787 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000788 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000789 self.assertSequenceEqual(calls[0],
790 ('execve', native_fullpath, (arguments, env)))
791
792 # test os._execvpe() with a relative path:
793 # os.get_exec_path() reads the 'PATH' variable
794 with _execvpe_mockup() as calls:
795 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000796 if test_type is bytes:
797 env_path[b'PATH'] = program_path
798 else:
799 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000800 self.assertRaises(OSError,
801 os._execvpe, program, arguments, env=env_path)
802 self.assertEqual(len(calls), 1)
803 self.assertSequenceEqual(calls[0],
804 ('execve', native_fullpath, (arguments, env_path)))
805
806 def test_internal_execvpe_str(self):
807 self._test_internal_execvpe(str)
808 if os.name != "nt":
809 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000810
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000811
Thomas Wouters477c8d52006-05-27 19:21:47 +0000812class Win32ErrorTests(unittest.TestCase):
813 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000814 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000815
816 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000817 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000818
819 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000820 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000821
822 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000823 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000824 try:
825 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
826 finally:
827 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000828 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000829
830 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000831 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000832
Thomas Wouters477c8d52006-05-27 19:21:47 +0000833 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000834 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000835
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000836class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000837 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000838 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
839 #singles.append("close")
840 #We omit close because it doesn'r raise an exception on some platforms
841 def get_single(f):
842 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000843 if hasattr(os, f):
844 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000845 return helper
846 for f in singles:
847 locals()["test_"+f] = get_single(f)
848
Benjamin Peterson7522c742009-01-19 21:00:09 +0000849 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000850 try:
851 f(support.make_bad_fd(), *args)
852 except OSError as e:
853 self.assertEqual(e.errno, errno.EBADF)
854 else:
855 self.fail("%r didn't raise a OSError with a bad file descriptor"
856 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000857
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000858 def test_isatty(self):
859 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000860 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000861
862 def test_closerange(self):
863 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000864 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000865 # Make sure none of the descriptors we are about to close are
866 # currently valid (issue 6542).
867 for i in range(10):
868 try: os.fstat(fd+i)
869 except OSError:
870 pass
871 else:
872 break
873 if i < 2:
874 raise unittest.SkipTest(
875 "Unable to acquire a range of invalid file descriptors")
876 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000877
878 def test_dup2(self):
879 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000880 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000881
882 def test_fchmod(self):
883 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000884 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000885
886 def test_fchown(self):
887 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000888 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000889
890 def test_fpathconf(self):
891 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000892 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000893
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000894 def test_ftruncate(self):
895 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000896 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000897
898 def test_lseek(self):
899 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000900 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000901
902 def test_read(self):
903 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000904 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000905
906 def test_tcsetpgrpt(self):
907 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000908 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000909
910 def test_write(self):
911 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000912 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000913
Brian Curtin1b9df392010-11-24 20:24:31 +0000914
915class LinkTests(unittest.TestCase):
916 def setUp(self):
917 self.file1 = support.TESTFN
918 self.file2 = os.path.join(support.TESTFN + "2")
919
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000920 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000921 for file in (self.file1, self.file2):
922 if os.path.exists(file):
923 os.unlink(file)
924
Brian Curtin1b9df392010-11-24 20:24:31 +0000925 def _test_link(self, file1, file2):
926 with open(file1, "w") as f1:
927 f1.write("test")
928
929 os.link(file1, file2)
930 with open(file1, "r") as f1, open(file2, "r") as f2:
931 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
932
933 def test_link(self):
934 self._test_link(self.file1, self.file2)
935
936 def test_link_bytes(self):
937 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
938 bytes(self.file2, sys.getfilesystemencoding()))
939
Brian Curtinf498b752010-11-30 15:54:04 +0000940 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000941 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000942 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000943 except UnicodeError:
944 raise unittest.SkipTest("Unable to encode for this platform.")
945
Brian Curtinf498b752010-11-30 15:54:04 +0000946 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000947 self.file2 = self.file1 + "2"
948 self._test_link(self.file1, self.file2)
949
Thomas Wouters477c8d52006-05-27 19:21:47 +0000950if sys.platform != 'win32':
951 class Win32ErrorTests(unittest.TestCase):
952 pass
953
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000954 class PosixUidGidTests(unittest.TestCase):
955 if hasattr(os, 'setuid'):
956 def test_setuid(self):
957 if os.getuid() != 0:
958 self.assertRaises(os.error, os.setuid, 0)
959 self.assertRaises(OverflowError, os.setuid, 1<<32)
960
961 if hasattr(os, 'setgid'):
962 def test_setgid(self):
963 if os.getuid() != 0:
964 self.assertRaises(os.error, os.setgid, 0)
965 self.assertRaises(OverflowError, os.setgid, 1<<32)
966
967 if hasattr(os, 'seteuid'):
968 def test_seteuid(self):
969 if os.getuid() != 0:
970 self.assertRaises(os.error, os.seteuid, 0)
971 self.assertRaises(OverflowError, os.seteuid, 1<<32)
972
973 if hasattr(os, 'setegid'):
974 def test_setegid(self):
975 if os.getuid() != 0:
976 self.assertRaises(os.error, os.setegid, 0)
977 self.assertRaises(OverflowError, os.setegid, 1<<32)
978
979 if hasattr(os, 'setreuid'):
980 def test_setreuid(self):
981 if os.getuid() != 0:
982 self.assertRaises(os.error, os.setreuid, 0, 0)
983 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
984 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000985
986 def test_setreuid_neg1(self):
987 # Needs to accept -1. We run this in a subprocess to avoid
988 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000989 subprocess.check_call([
990 sys.executable, '-c',
991 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000992
993 if hasattr(os, 'setregid'):
994 def test_setregid(self):
995 if os.getuid() != 0:
996 self.assertRaises(os.error, os.setregid, 0, 0)
997 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
998 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000999
1000 def test_setregid_neg1(self):
1001 # Needs to accept -1. We run this in a subprocess to avoid
1002 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001003 subprocess.check_call([
1004 sys.executable, '-c',
1005 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001006
1007 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001008 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001009 if support.TESTFN_UNENCODABLE:
1010 self.dir = support.TESTFN_UNENCODABLE
1011 else:
1012 self.dir = support.TESTFN
1013 self.bdir = os.fsencode(self.dir)
1014
1015 bytesfn = []
1016 def add_filename(fn):
1017 try:
1018 fn = os.fsencode(fn)
1019 except UnicodeEncodeError:
1020 return
1021 bytesfn.append(fn)
1022 add_filename(support.TESTFN_UNICODE)
1023 if support.TESTFN_UNENCODABLE:
1024 add_filename(support.TESTFN_UNENCODABLE)
1025 if not bytesfn:
1026 self.skipTest("couldn't create any non-ascii filename")
1027
1028 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001029 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001030 try:
1031 for fn in bytesfn:
1032 f = open(os.path.join(self.bdir, fn), "w")
1033 f.close()
Victor Stinnere8d51452010-08-19 01:05:19 +00001034 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001035 if fn in self.unicodefn:
1036 raise ValueError("duplicate filename")
1037 self.unicodefn.add(fn)
1038 except:
1039 shutil.rmtree(self.dir)
1040 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001041
1042 def tearDown(self):
1043 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001044
1045 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001046 expected = self.unicodefn
1047 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001048 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001049
1050 def test_open(self):
1051 for fn in self.unicodefn:
1052 f = open(os.path.join(self.dir, fn))
1053 f.close()
1054
1055 def test_stat(self):
1056 for fn in self.unicodefn:
1057 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001058else:
1059 class PosixUidGidTests(unittest.TestCase):
1060 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001061 class Pep383Tests(unittest.TestCase):
1062 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001063
Brian Curtineb24d742010-04-12 17:16:38 +00001064@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1065class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001066 def _kill(self, sig):
1067 # Start sys.executable as a subprocess and communicate from the
1068 # subprocess to the parent that the interpreter is ready. When it
1069 # becomes ready, send *sig* via os.kill to the subprocess and check
1070 # that the return code is equal to *sig*.
1071 import ctypes
1072 from ctypes import wintypes
1073 import msvcrt
1074
1075 # Since we can't access the contents of the process' stdout until the
1076 # process has exited, use PeekNamedPipe to see what's inside stdout
1077 # without waiting. This is done so we can tell that the interpreter
1078 # is started and running at a point where it could handle a signal.
1079 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1080 PeekNamedPipe.restype = wintypes.BOOL
1081 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1082 ctypes.POINTER(ctypes.c_char), # stdout buf
1083 wintypes.DWORD, # Buffer size
1084 ctypes.POINTER(wintypes.DWORD), # bytes read
1085 ctypes.POINTER(wintypes.DWORD), # bytes avail
1086 ctypes.POINTER(wintypes.DWORD)) # bytes left
1087 msg = "running"
1088 proc = subprocess.Popen([sys.executable, "-c",
1089 "import sys;"
1090 "sys.stdout.write('{}');"
1091 "sys.stdout.flush();"
1092 "input()".format(msg)],
1093 stdout=subprocess.PIPE,
1094 stderr=subprocess.PIPE,
1095 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001096 self.addCleanup(proc.stdout.close)
1097 self.addCleanup(proc.stderr.close)
1098 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001099
1100 count, max = 0, 100
1101 while count < max and proc.poll() is None:
1102 # Create a string buffer to store the result of stdout from the pipe
1103 buf = ctypes.create_string_buffer(len(msg))
1104 # Obtain the text currently in proc.stdout
1105 # Bytes read/avail/left are left as NULL and unused
1106 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1107 buf, ctypes.sizeof(buf), None, None, None)
1108 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1109 if buf.value:
1110 self.assertEqual(msg, buf.value.decode())
1111 break
1112 time.sleep(0.1)
1113 count += 1
1114 else:
1115 self.fail("Did not receive communication from the subprocess")
1116
Brian Curtineb24d742010-04-12 17:16:38 +00001117 os.kill(proc.pid, sig)
1118 self.assertEqual(proc.wait(), sig)
1119
1120 def test_kill_sigterm(self):
1121 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001122 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001123
1124 def test_kill_int(self):
1125 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001126 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001127
1128 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001129 tagname = "test_os_%s" % uuid.uuid1()
1130 m = mmap.mmap(-1, 1, tagname)
1131 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001132 # Run a script which has console control handling enabled.
1133 proc = subprocess.Popen([sys.executable,
1134 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001135 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001136 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1137 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001138 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001139 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001140 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001141 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001142 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001143 count += 1
1144 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001145 # Forcefully kill the process if we weren't able to signal it.
1146 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001147 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001148 os.kill(proc.pid, event)
1149 # proc.send_signal(event) could also be done here.
1150 # Allow time for the signal to be passed and the process to exit.
1151 time.sleep(0.5)
1152 if not proc.poll():
1153 # Forcefully kill the process if we weren't able to signal it.
1154 os.kill(proc.pid, signal.SIGINT)
1155 self.fail("subprocess did not stop on {}".format(name))
1156
1157 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1158 def test_CTRL_C_EVENT(self):
1159 from ctypes import wintypes
1160 import ctypes
1161
1162 # Make a NULL value by creating a pointer with no argument.
1163 NULL = ctypes.POINTER(ctypes.c_int)()
1164 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1165 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1166 wintypes.BOOL)
1167 SetConsoleCtrlHandler.restype = wintypes.BOOL
1168
1169 # Calling this with NULL and FALSE causes the calling process to
1170 # handle CTRL+C, rather than ignore it. This property is inherited
1171 # by subprocesses.
1172 SetConsoleCtrlHandler(NULL, 0)
1173
1174 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1175
1176 def test_CTRL_BREAK_EVENT(self):
1177 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1178
1179
Brian Curtind40e6f72010-07-08 21:39:08 +00001180@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001181@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001182class Win32SymlinkTests(unittest.TestCase):
1183 filelink = 'filelinktest'
1184 filelink_target = os.path.abspath(__file__)
1185 dirlink = 'dirlinktest'
1186 dirlink_target = os.path.dirname(filelink_target)
1187 missing_link = 'missing link'
1188
1189 def setUp(self):
1190 assert os.path.exists(self.dirlink_target)
1191 assert os.path.exists(self.filelink_target)
1192 assert not os.path.exists(self.dirlink)
1193 assert not os.path.exists(self.filelink)
1194 assert not os.path.exists(self.missing_link)
1195
1196 def tearDown(self):
1197 if os.path.exists(self.filelink):
1198 os.remove(self.filelink)
1199 if os.path.exists(self.dirlink):
1200 os.rmdir(self.dirlink)
1201 if os.path.lexists(self.missing_link):
1202 os.remove(self.missing_link)
1203
1204 def test_directory_link(self):
1205 os.symlink(self.dirlink_target, self.dirlink)
1206 self.assertTrue(os.path.exists(self.dirlink))
1207 self.assertTrue(os.path.isdir(self.dirlink))
1208 self.assertTrue(os.path.islink(self.dirlink))
1209 self.check_stat(self.dirlink, self.dirlink_target)
1210
1211 def test_file_link(self):
1212 os.symlink(self.filelink_target, self.filelink)
1213 self.assertTrue(os.path.exists(self.filelink))
1214 self.assertTrue(os.path.isfile(self.filelink))
1215 self.assertTrue(os.path.islink(self.filelink))
1216 self.check_stat(self.filelink, self.filelink_target)
1217
1218 def _create_missing_dir_link(self):
1219 'Create a "directory" link to a non-existent target'
1220 linkname = self.missing_link
1221 if os.path.lexists(linkname):
1222 os.remove(linkname)
1223 target = r'c:\\target does not exist.29r3c740'
1224 assert not os.path.exists(target)
1225 target_is_dir = True
1226 os.symlink(target, linkname, target_is_dir)
1227
1228 def test_remove_directory_link_to_missing_target(self):
1229 self._create_missing_dir_link()
1230 # For compatibility with Unix, os.remove will check the
1231 # directory status and call RemoveDirectory if the symlink
1232 # was created with target_is_dir==True.
1233 os.remove(self.missing_link)
1234
1235 @unittest.skip("currently fails; consider for improvement")
1236 def test_isdir_on_directory_link_to_missing_target(self):
1237 self._create_missing_dir_link()
1238 # consider having isdir return true for directory links
1239 self.assertTrue(os.path.isdir(self.missing_link))
1240
1241 @unittest.skip("currently fails; consider for improvement")
1242 def test_rmdir_on_directory_link_to_missing_target(self):
1243 self._create_missing_dir_link()
1244 # consider allowing rmdir to remove directory links
1245 os.rmdir(self.missing_link)
1246
1247 def check_stat(self, link, target):
1248 self.assertEqual(os.stat(link), os.stat(target))
1249 self.assertNotEqual(os.lstat(link), os.stat(link))
1250
Brian Curtind25aef52011-06-13 15:16:04 -05001251 bytes_link = os.fsencode(link)
1252 self.assertEqual(os.stat(bytes_link), os.stat(target))
1253 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
1254
1255 def test_12084(self):
1256 level1 = os.path.abspath(support.TESTFN)
1257 level2 = os.path.join(level1, "level2")
1258 level3 = os.path.join(level2, "level3")
1259 try:
1260 os.mkdir(level1)
1261 os.mkdir(level2)
1262 os.mkdir(level3)
1263
1264 file1 = os.path.abspath(os.path.join(level1, "file1"))
1265
1266 with open(file1, "w") as f:
1267 f.write("file1")
1268
1269 orig_dir = os.getcwd()
1270 try:
1271 os.chdir(level2)
1272 link = os.path.join(level2, "link")
1273 os.symlink(os.path.relpath(file1), "link")
1274 self.assertIn("link", os.listdir(os.getcwd()))
1275
1276 # Check os.stat calls from the same dir as the link
1277 self.assertEqual(os.stat(file1), os.stat("link"))
1278
1279 # Check os.stat calls from a dir below the link
1280 os.chdir(level1)
1281 self.assertEqual(os.stat(file1),
1282 os.stat(os.path.relpath(link)))
1283
1284 # Check os.stat calls from a dir above the link
1285 os.chdir(level3)
1286 self.assertEqual(os.stat(file1),
1287 os.stat(os.path.relpath(link)))
1288 finally:
1289 os.chdir(orig_dir)
1290 except OSError as err:
1291 self.fail(err)
1292 finally:
1293 os.remove(file1)
1294 shutil.rmtree(level1)
1295
Brian Curtind40e6f72010-07-08 21:39:08 +00001296
Victor Stinnere8d51452010-08-19 01:05:19 +00001297class FSEncodingTests(unittest.TestCase):
1298 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001299 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1300 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001301
Victor Stinnere8d51452010-08-19 01:05:19 +00001302 def test_identity(self):
1303 # assert fsdecode(fsencode(x)) == x
1304 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1305 try:
1306 bytesfn = os.fsencode(fn)
1307 except UnicodeEncodeError:
1308 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001309 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001310
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001311
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001312class PidTests(unittest.TestCase):
1313 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1314 def test_getppid(self):
1315 p = subprocess.Popen([sys.executable, '-c',
1316 'import os; print(os.getppid())'],
1317 stdout=subprocess.PIPE)
1318 stdout, _ = p.communicate()
1319 # We are the parent of our subprocess
1320 self.assertEqual(int(stdout), os.getpid())
1321
1322
Brian Curtin0151b8e2010-09-24 13:43:43 +00001323# The introduction of this TestCase caused at least two different errors on
1324# *nix buildbots. Temporarily skip this to let the buildbots move along.
1325@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001326@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1327class LoginTests(unittest.TestCase):
1328 def test_getlogin(self):
1329 user_name = os.getlogin()
1330 self.assertNotEqual(len(user_name), 0)
1331
1332
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001333@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1334 "needs os.getpriority and os.setpriority")
1335class ProgramPriorityTests(unittest.TestCase):
1336 """Tests for os.getpriority() and os.setpriority()."""
1337
1338 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001339
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001340 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1341 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1342 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001343 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1344 if base >= 19 and new_prio <= 19:
1345 raise unittest.SkipTest(
1346 "unable to reliably test setpriority at current nice level of %s" % base)
1347 else:
1348 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001349 finally:
1350 try:
1351 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1352 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001353 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001354 raise
1355
1356
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001357if threading is not None:
1358 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001359
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001360 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001361
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001362 def __init__(self, conn):
1363 asynchat.async_chat.__init__(self, conn)
1364 self.in_buffer = []
1365 self.closed = False
1366 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001367
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001368 def handle_read(self):
1369 data = self.recv(4096)
1370 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001371
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001372 def get_data(self):
1373 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001374
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001375 def handle_close(self):
1376 self.close()
1377 self.closed = True
1378
1379 def handle_error(self):
1380 raise
1381
1382 def __init__(self, address):
1383 threading.Thread.__init__(self)
1384 asyncore.dispatcher.__init__(self)
1385 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1386 self.bind(address)
1387 self.listen(5)
1388 self.host, self.port = self.socket.getsockname()[:2]
1389 self.handler_instance = None
1390 self._active = False
1391 self._active_lock = threading.Lock()
1392
1393 # --- public API
1394
1395 @property
1396 def running(self):
1397 return self._active
1398
1399 def start(self):
1400 assert not self.running
1401 self.__flag = threading.Event()
1402 threading.Thread.start(self)
1403 self.__flag.wait()
1404
1405 def stop(self):
1406 assert self.running
1407 self._active = False
1408 self.join()
1409
1410 def wait(self):
1411 # wait for handler connection to be closed, then stop the server
1412 while not getattr(self.handler_instance, "closed", False):
1413 time.sleep(0.001)
1414 self.stop()
1415
1416 # --- internals
1417
1418 def run(self):
1419 self._active = True
1420 self.__flag.set()
1421 while self._active and asyncore.socket_map:
1422 self._active_lock.acquire()
1423 asyncore.loop(timeout=0.001, count=1)
1424 self._active_lock.release()
1425 asyncore.close_all()
1426
1427 def handle_accept(self):
1428 conn, addr = self.accept()
1429 self.handler_instance = self.Handler(conn)
1430
1431 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001432 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001433 handle_read = handle_connect
1434
1435 def writable(self):
1436 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001437
1438 def handle_error(self):
1439 raise
1440
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001441
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001442@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001443@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1444class TestSendfile(unittest.TestCase):
1445
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001446 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001447 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001448 not sys.platform.startswith("solaris") and \
1449 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001450
1451 @classmethod
1452 def setUpClass(cls):
1453 with open(support.TESTFN, "wb") as f:
1454 f.write(cls.DATA)
1455
1456 @classmethod
1457 def tearDownClass(cls):
1458 support.unlink(support.TESTFN)
1459
1460 def setUp(self):
1461 self.server = SendfileTestServer((support.HOST, 0))
1462 self.server.start()
1463 self.client = socket.socket()
1464 self.client.connect((self.server.host, self.server.port))
1465 self.client.settimeout(1)
1466 # synchronize by waiting for "220 ready" response
1467 self.client.recv(1024)
1468 self.sockno = self.client.fileno()
1469 self.file = open(support.TESTFN, 'rb')
1470 self.fileno = self.file.fileno()
1471
1472 def tearDown(self):
1473 self.file.close()
1474 self.client.close()
1475 if self.server.running:
1476 self.server.stop()
1477
1478 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1479 """A higher level wrapper representing how an application is
1480 supposed to use sendfile().
1481 """
1482 while 1:
1483 try:
1484 if self.SUPPORT_HEADERS_TRAILERS:
1485 return os.sendfile(sock, file, offset, nbytes, headers,
1486 trailers)
1487 else:
1488 return os.sendfile(sock, file, offset, nbytes)
1489 except OSError as err:
1490 if err.errno == errno.ECONNRESET:
1491 # disconnected
1492 raise
1493 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1494 # we have to retry send data
1495 continue
1496 else:
1497 raise
1498
1499 def test_send_whole_file(self):
1500 # normal send
1501 total_sent = 0
1502 offset = 0
1503 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001504 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001505 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1506 if sent == 0:
1507 break
1508 offset += sent
1509 total_sent += sent
1510 self.assertTrue(sent <= nbytes)
1511 self.assertEqual(offset, total_sent)
1512
1513 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001514 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001515 self.client.close()
1516 self.server.wait()
1517 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001518 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001519 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001520
1521 def test_send_at_certain_offset(self):
1522 # start sending a file at a certain offset
1523 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001524 offset = len(self.DATA) // 2
1525 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001526 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001527 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001528 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1529 if sent == 0:
1530 break
1531 offset += sent
1532 total_sent += sent
1533 self.assertTrue(sent <= nbytes)
1534
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001535 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001536 self.client.close()
1537 self.server.wait()
1538 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001539 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001540 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001541 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001542 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001543
1544 def test_offset_overflow(self):
1545 # specify an offset > file size
1546 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001547 try:
1548 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1549 except OSError as e:
1550 # Solaris can raise EINVAL if offset >= file length, ignore.
1551 if e.errno != errno.EINVAL:
1552 raise
1553 else:
1554 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001555 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001556 self.client.close()
1557 self.server.wait()
1558 data = self.server.handler_instance.get_data()
1559 self.assertEqual(data, b'')
1560
1561 def test_invalid_offset(self):
1562 with self.assertRaises(OSError) as cm:
1563 os.sendfile(self.sockno, self.fileno, -1, 4096)
1564 self.assertEqual(cm.exception.errno, errno.EINVAL)
1565
1566 # --- headers / trailers tests
1567
1568 if SUPPORT_HEADERS_TRAILERS:
1569
1570 def test_headers(self):
1571 total_sent = 0
1572 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1573 headers=[b"x" * 512])
1574 total_sent += sent
1575 offset = 4096
1576 nbytes = 4096
1577 while 1:
1578 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1579 offset, nbytes)
1580 if sent == 0:
1581 break
1582 total_sent += sent
1583 offset += sent
1584
1585 expected_data = b"x" * 512 + self.DATA
1586 self.assertEqual(total_sent, len(expected_data))
1587 self.client.close()
1588 self.server.wait()
1589 data = self.server.handler_instance.get_data()
1590 self.assertEqual(hash(data), hash(expected_data))
1591
1592 def test_trailers(self):
1593 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001594 with open(TESTFN2, 'wb') as f:
1595 f.write(b"abcde")
1596 with open(TESTFN2, 'rb')as f:
1597 self.addCleanup(os.remove, TESTFN2)
1598 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1599 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001600 self.client.close()
1601 self.server.wait()
1602 data = self.server.handler_instance.get_data()
1603 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001604
1605 if hasattr(os, "SF_NODISKIO"):
1606 def test_flags(self):
1607 try:
1608 os.sendfile(self.sockno, self.fileno, 0, 4096,
1609 flags=os.SF_NODISKIO)
1610 except OSError as err:
1611 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1612 raise
1613
1614
Fred Drake2e2be372001-09-20 21:33:42 +00001615def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001616 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001617 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001618 StatAttributeTests,
1619 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001620 WalkTests,
1621 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001622 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001623 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001624 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001625 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001626 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001627 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001628 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001629 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001630 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001631 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001632 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001633 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001634 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001635 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001636 ProgramPriorityTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001637 )
Fred Drake2e2be372001-09-20 21:33:42 +00001638
1639if __name__ == "__main__":
1640 test_main()