blob: aa9ff5dfe9e7f3f008170c731dcb8c6799bdab64 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
17import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000018import asyncore
19import asynchat
20import socket
21try:
22 import threading
23except ImportError:
24 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000025
Mark Dickinson7cf03892010-04-16 13:45:35 +000026# Detect whether we're on a Linux system that uses the (now outdated
27# and unmaintained) linuxthreads threading library. There's an issue
28# when combining linuxthreads with a failed execv call: see
29# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020030if hasattr(sys, 'thread_info') and sys.thread_info.version:
31 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
32else:
33 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000034
Thomas Wouters0e3f5912006-08-11 14:57:12 +000035# Tests creating TESTFN
36class FileTests(unittest.TestCase):
37 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038 if os.path.exists(support.TESTFN):
39 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000040 tearDown = setUp
41
42 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000043 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000044 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000045 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046
Christian Heimesfdab48e2008-01-20 09:06:41 +000047 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000048 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
49 # We must allocate two consecutive file descriptors, otherwise
50 # it will mess up other file descriptors (perhaps even the three
51 # standard ones).
52 second = os.dup(first)
53 try:
54 retries = 0
55 while second != first + 1:
56 os.close(first)
57 retries += 1
58 if retries > 10:
59 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000060 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000061 first, second = second, os.dup(second)
62 finally:
63 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000064 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000065 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000066 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000068 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000069 def test_rename(self):
70 path = support.TESTFN
71 old = sys.getrefcount(path)
72 self.assertRaises(TypeError, os.rename, path, 0)
73 new = sys.getrefcount(path)
74 self.assertEqual(old, new)
75
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000076 def test_read(self):
77 with open(support.TESTFN, "w+b") as fobj:
78 fobj.write(b"spam")
79 fobj.flush()
80 fd = fobj.fileno()
81 os.lseek(fd, 0, 0)
82 s = os.read(fd, 4)
83 self.assertEqual(type(s), bytes)
84 self.assertEqual(s, b"spam")
85
86 def test_write(self):
87 # os.write() accepts bytes- and buffer-like objects but not strings
88 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
89 self.assertRaises(TypeError, os.write, fd, "beans")
90 os.write(fd, b"bacon\n")
91 os.write(fd, bytearray(b"eggs\n"))
92 os.write(fd, memoryview(b"spam\n"))
93 os.close(fd)
94 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000095 self.assertEqual(fobj.read().splitlines(),
96 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000097
Victor Stinnere0daff12011-03-20 23:36:35 +010098 def write_windows_console(self, *args):
99 retcode = subprocess.call(args,
100 # use a new console to not flood the test output
101 creationflags=subprocess.CREATE_NEW_CONSOLE,
102 # use a shell to hide the console window (SW_HIDE)
103 shell=True)
104 self.assertEqual(retcode, 0)
105
106 @unittest.skipUnless(sys.platform == 'win32',
107 'test specific to the Windows console')
108 def test_write_windows_console(self):
109 # Issue #11395: the Windows console returns an error (12: not enough
110 # space error) on writing into stdout if stdout mode is binary and the
111 # length is greater than 66,000 bytes (or less, depending on heap
112 # usage).
113 code = "print('x' * 100000)"
114 self.write_windows_console(sys.executable, "-c", code)
115 self.write_windows_console(sys.executable, "-u", "-c", code)
116
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000117
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000118class TemporaryFileTests(unittest.TestCase):
119 def setUp(self):
120 self.files = []
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000121 os.mkdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000122
123 def tearDown(self):
124 for name in self.files:
125 os.unlink(name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000126 os.rmdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000127
128 def check_tempfile(self, name):
129 # make sure it doesn't already exist:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000130 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000131 "file already exists for temporary file")
132 # make sure we can create the file
133 open(name, "w")
134 self.files.append(name)
135
136 def test_tempnam(self):
137 if not hasattr(os, "tempnam"):
138 return
139 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
140 r"test_os$")
141 self.check_tempfile(os.tempnam())
142
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000143 name = os.tempnam(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000144 self.check_tempfile(name)
145
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000146 name = os.tempnam(support.TESTFN, "pfx")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000147 self.assertTrue(os.path.basename(name)[:3] == "pfx")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000148 self.check_tempfile(name)
149
150 def test_tmpfile(self):
151 if not hasattr(os, "tmpfile"):
152 return
153 # As with test_tmpnam() below, the Windows implementation of tmpfile()
154 # attempts to create a file in the root directory of the current drive.
155 # On Vista and Server 2008, this test will always fail for normal users
156 # as writing to the root directory requires elevated privileges. With
157 # XP and below, the semantics of tmpfile() are the same, but the user
158 # running the test is more likely to have administrative privileges on
159 # their account already. If that's the case, then os.tmpfile() should
160 # work. In order to make this test as useful as possible, rather than
161 # trying to detect Windows versions or whether or not the user has the
162 # right permissions, just try and create a file in the root directory
163 # and see if it raises a 'Permission denied' OSError. If it does, then
164 # test that a subsequent call to os.tmpfile() raises the same error. If
165 # it doesn't, assume we're on XP or below and the user running the test
166 # has administrative privileges, and proceed with the test as normal.
167 if sys.platform == 'win32':
168 name = '\\python_test_os_test_tmpfile.txt'
169 if os.path.exists(name):
170 os.remove(name)
171 try:
172 fp = open(name, 'w')
173 except IOError as first:
174 # open() failed, assert tmpfile() fails in the same way.
175 # Although open() raises an IOError and os.tmpfile() raises an
176 # OSError(), 'args' will be (13, 'Permission denied') in both
177 # cases.
178 try:
179 fp = os.tmpfile()
180 except OSError as second:
181 self.assertEqual(first.args, second.args)
182 else:
183 self.fail("expected os.tmpfile() to raise OSError")
184 return
185 else:
186 # open() worked, therefore, tmpfile() should work. Close our
187 # dummy file and proceed with the test as normal.
188 fp.close()
189 os.remove(name)
190
191 fp = os.tmpfile()
192 fp.write("foobar")
193 fp.seek(0,0)
194 s = fp.read()
195 fp.close()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000196 self.assertTrue(s == "foobar")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000197
198 def test_tmpnam(self):
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000199 if not hasattr(os, "tmpnam"):
200 return
201 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
202 r"test_os$")
203 name = os.tmpnam()
204 if sys.platform in ("win32",):
205 # The Windows tmpnam() seems useless. From the MS docs:
206 #
207 # The character string that tmpnam creates consists of
208 # the path prefix, defined by the entry P_tmpdir in the
209 # file STDIO.H, followed by a sequence consisting of the
210 # digit characters '0' through '9'; the numerical value
211 # of this string is in the range 1 - 65,535. Changing the
212 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not
213 # change the operation of tmpnam.
214 #
215 # The really bizarre part is that, at least under MSVC6,
216 # P_tmpdir is "\\". That is, the path returned refers to
217 # the root of the current drive. That's a terrible place to
218 # put temp files, and, depending on privileges, the user
219 # may not even be able to open a file in the root directory.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000220 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000221 "file already exists for temporary file")
222 else:
223 self.check_tempfile(name)
224
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000225 def fdopen_helper(self, *args):
226 fd = os.open(support.TESTFN, os.O_RDONLY)
227 fp2 = os.fdopen(fd, *args)
228 fp2.close()
229
230 def test_fdopen(self):
231 self.fdopen_helper()
232 self.fdopen_helper('r')
233 self.fdopen_helper('r', 100)
234
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235# Test attributes on return values from os.*stat* family.
236class StatAttributeTests(unittest.TestCase):
237 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000238 os.mkdir(support.TESTFN)
239 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000240 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000241 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000243
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244 def tearDown(self):
245 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000246 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247
Antoine Pitrou38425292010-09-21 18:19:07 +0000248 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 if not hasattr(os, "stat"):
250 return
251
252 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000253 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254
255 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(result[stat.ST_SIZE], 3)
257 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259 # Make sure all the attributes are there
260 members = dir(result)
261 for name in dir(stat):
262 if name[:3] == 'ST_':
263 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000264 if name.endswith("TIME"):
265 def trunc(x): return int(x)
266 else:
267 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000268 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000270 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271
272 try:
273 result[200]
274 self.fail("No exception thrown")
275 except IndexError:
276 pass
277
278 # Make sure that assignment fails
279 try:
280 result.st_mode = 1
281 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000282 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000283 pass
284
285 try:
286 result.st_rdev = 1
287 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000288 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000289 pass
290
291 try:
292 result.parrot = 1
293 self.fail("No exception thrown")
294 except AttributeError:
295 pass
296
297 # Use the stat_result constructor with a too-short tuple.
298 try:
299 result2 = os.stat_result((10,))
300 self.fail("No exception thrown")
301 except TypeError:
302 pass
303
Ezio Melotti42da6632011-03-15 05:18:48 +0200304 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000305 try:
306 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
307 except TypeError:
308 pass
309
Antoine Pitrou38425292010-09-21 18:19:07 +0000310 def test_stat_attributes(self):
311 self.check_stat_attributes(self.fname)
312
313 def test_stat_attributes_bytes(self):
314 try:
315 fname = self.fname.encode(sys.getfilesystemencoding())
316 except UnicodeEncodeError:
317 self.skipTest("cannot encode %a for the filesystem" % self.fname)
318 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000319
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000320 def test_statvfs_attributes(self):
321 if not hasattr(os, "statvfs"):
322 return
323
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000324 try:
325 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000326 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000327 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000328 if e.errno == errno.ENOSYS:
329 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000330
331 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000334 # Make sure all the attributes are there.
335 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
336 'ffree', 'favail', 'flag', 'namemax')
337 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339
340 # Make sure that assignment really fails
341 try:
342 result.f_bfree = 1
343 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000344 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345 pass
346
347 try:
348 result.parrot = 1
349 self.fail("No exception thrown")
350 except AttributeError:
351 pass
352
353 # Use the constructor with a too-short tuple.
354 try:
355 result2 = os.statvfs_result((10,))
356 self.fail("No exception thrown")
357 except TypeError:
358 pass
359
Ezio Melotti42da6632011-03-15 05:18:48 +0200360 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361 try:
362 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
363 except TypeError:
364 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000365
Thomas Wouters89f507f2006-12-13 04:49:30 +0000366 def test_utime_dir(self):
367 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000368 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000369 # round to int, because some systems may support sub-second
370 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000371 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
372 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000373 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000374
375 # Restrict test to Win32, since there is no guarantee other
376 # systems support centiseconds
377 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000378 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000379 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000380 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000381 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000382 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000383 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000384 return buf.value
385
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000386 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000387 def test_1565150(self):
388 t1 = 1159195039.25
389 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000390 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000391
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000392 def test_large_time(self):
393 t1 = 5000000000 # some day in 2128
394 os.utime(self.fname, (t1, t1))
395 self.assertEqual(os.stat(self.fname).st_mtime, t1)
396
Guido van Rossumd8faa362007-04-27 19:54:29 +0000397 def test_1686475(self):
398 # Verify that an open file can be stat'ed
399 try:
400 os.stat(r"c:\pagefile.sys")
401 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000402 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000403 return
404 self.fail("Could not stat pagefile.sys")
405
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000406from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000407
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000408class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000409 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000410 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000411
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000412 def setUp(self):
413 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000414 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000415 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000416 for key, value in self._reference().items():
417 os.environ[key] = value
418
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000419 def tearDown(self):
420 os.environ.clear()
421 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000422 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000423 os.environb.clear()
424 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000425
Christian Heimes90333392007-11-01 19:08:42 +0000426 def _reference(self):
427 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
428
429 def _empty_mapping(self):
430 os.environ.clear()
431 return os.environ
432
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000433 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000434 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000435 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000436 if os.path.exists("/bin/sh"):
437 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000438 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
439 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000440 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000441
Christian Heimes1a13d592007-11-08 14:16:55 +0000442 def test_os_popen_iter(self):
443 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000444 with os.popen(
445 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
446 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000447 self.assertEqual(next(it), "line1\n")
448 self.assertEqual(next(it), "line2\n")
449 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000450 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000451
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000452 # Verify environ keys and values from the OS are of the
453 # correct str type.
454 def test_keyvalue_types(self):
455 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000456 self.assertEqual(type(key), str)
457 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000458
Christian Heimes90333392007-11-01 19:08:42 +0000459 def test_items(self):
460 for key, value in self._reference().items():
461 self.assertEqual(os.environ.get(key), value)
462
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000463 # Issue 7310
464 def test___repr__(self):
465 """Check that the repr() of os.environ looks like environ({...})."""
466 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000467 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
468 '{!r}: {!r}'.format(key, value)
469 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000470
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000471 def test_get_exec_path(self):
472 defpath_list = os.defpath.split(os.pathsep)
473 test_path = ['/monty', '/python', '', '/flying/circus']
474 test_env = {'PATH': os.pathsep.join(test_path)}
475
476 saved_environ = os.environ
477 try:
478 os.environ = dict(test_env)
479 # Test that defaulting to os.environ works.
480 self.assertSequenceEqual(test_path, os.get_exec_path())
481 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
482 finally:
483 os.environ = saved_environ
484
485 # No PATH environment variable
486 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
487 # Empty PATH environment variable
488 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
489 # Supplied PATH environment variable
490 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
491
Victor Stinnerb745a742010-05-18 17:17:23 +0000492 if os.supports_bytes_environ:
493 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000494 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000495 # ignore BytesWarning warning
496 with warnings.catch_warnings(record=True):
497 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000498 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000499 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000500 pass
501 else:
502 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000503
504 # bytes key and/or value
505 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
506 ['abc'])
507 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
508 ['abc'])
509 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
510 ['abc'])
511
512 @unittest.skipUnless(os.supports_bytes_environ,
513 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000514 def test_environb(self):
515 # os.environ -> os.environb
516 value = 'euro\u20ac'
517 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000518 value_bytes = value.encode(sys.getfilesystemencoding(),
519 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000520 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000521 msg = "U+20AC character is not encodable to %s" % (
522 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000523 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000524 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000525 self.assertEqual(os.environ['unicode'], value)
526 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000527
528 # os.environb -> os.environ
529 value = b'\xff'
530 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000531 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000532 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000533 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000534
Tim Petersc4e09402003-04-25 07:11:48 +0000535class WalkTests(unittest.TestCase):
536 """Tests for os.walk()."""
537
538 def test_traversal(self):
539 import os
540 from os.path import join
541
542 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000543 # TESTFN/
544 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000545 # tmp1
546 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000547 # tmp2
548 # SUB11/ no kids
549 # SUB2/ a file kid and a dirsymlink kid
550 # tmp3
551 # link/ a symlink to TESTFN.2
552 # TEST2/
553 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000554 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000555 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000556 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000557 sub2_path = join(walk_path, "SUB2")
558 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000559 tmp2_path = join(sub1_path, "tmp2")
560 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000561 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000562 t2_path = join(support.TESTFN, "TEST2")
563 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000564
565 # Create stuff.
566 os.makedirs(sub11_path)
567 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000568 os.makedirs(t2_path)
569 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000570 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000571 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
572 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000573 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000574 os.symlink(os.path.abspath(t2_path), link_path)
575 sub2_tree = (sub2_path, ["link"], ["tmp3"])
576 else:
577 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000578
579 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000580 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000581 self.assertEqual(len(all), 4)
582 # We can't know which order SUB1 and SUB2 will appear in.
583 # Not flipped: TESTFN, SUB1, SUB11, SUB2
584 # flipped: TESTFN, SUB2, SUB1, SUB11
585 flipped = all[0][1][0] != "SUB1"
586 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000587 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000588 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
589 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000590 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000591
592 # Prune the search.
593 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000594 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000595 all.append((root, dirs, files))
596 # Don't descend into SUB1.
597 if 'SUB1' in dirs:
598 # Note that this also mutates the dirs we appended to all!
599 dirs.remove('SUB1')
600 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000601 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
602 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000603
604 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000605 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000606 self.assertEqual(len(all), 4)
607 # We can't know which order SUB1 and SUB2 will appear in.
608 # Not flipped: SUB11, SUB1, SUB2, TESTFN
609 # flipped: SUB2, SUB11, SUB1, TESTFN
610 flipped = all[3][1][0] != "SUB1"
611 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000612 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000613 self.assertEqual(all[flipped], (sub11_path, [], []))
614 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000615 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000616
Brian Curtin3b4499c2010-12-28 14:31:47 +0000617 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000618 # Walk, following symlinks.
619 for root, dirs, files in os.walk(walk_path, followlinks=True):
620 if root == link_path:
621 self.assertEqual(dirs, [])
622 self.assertEqual(files, ["tmp4"])
623 break
624 else:
625 self.fail("Didn't follow symlink with followlinks=True")
626
627 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000628 # Tear everything down. This is a decent use for bottom-up on
629 # Windows, which doesn't have a recursive delete command. The
630 # (not so) subtlety is that rmdir will fail unless the dir's
631 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000632 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000633 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000634 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000635 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000636 dirname = os.path.join(root, name)
637 if not os.path.islink(dirname):
638 os.rmdir(dirname)
639 else:
640 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000641 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000642
Guido van Rossume7ba4952007-06-06 23:52:48 +0000643class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000644 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000645 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000646
647 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000648 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000649 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
650 os.makedirs(path) # Should work
651 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
652 os.makedirs(path)
653
654 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000655 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000656 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
657 os.makedirs(path)
658 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
659 'dir5', 'dir6')
660 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000661
Terry Reedy5a22b652010-12-02 07:05:56 +0000662 def test_exist_ok_existing_directory(self):
663 path = os.path.join(support.TESTFN, 'dir1')
664 mode = 0o777
665 old_mask = os.umask(0o022)
666 os.makedirs(path, mode)
667 self.assertRaises(OSError, os.makedirs, path, mode)
668 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
669 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
670 os.makedirs(path, mode=mode, exist_ok=True)
671 os.umask(old_mask)
672
673 def test_exist_ok_existing_regular_file(self):
674 base = support.TESTFN
675 path = os.path.join(support.TESTFN, 'dir1')
676 f = open(path, 'w')
677 f.write('abc')
678 f.close()
679 self.assertRaises(OSError, os.makedirs, path)
680 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
681 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
682 os.remove(path)
683
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000684 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000685 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000686 'dir4', 'dir5', 'dir6')
687 # If the tests failed, the bottom-most directory ('../dir6')
688 # may not have been created, so we look for the outermost directory
689 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000690 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000691 path = os.path.dirname(path)
692
693 os.removedirs(path)
694
Guido van Rossume7ba4952007-06-06 23:52:48 +0000695class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000696 def test_devnull(self):
Alex Martelli01c77c62006-08-24 02:58:11 +0000697 f = open(os.devnull, 'w')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000698 f.write('hello')
699 f.close()
Alex Martelli01c77c62006-08-24 02:58:11 +0000700 f = open(os.devnull, 'r')
Tim Peters4182cfd2004-06-08 20:34:34 +0000701 self.assertEqual(f.read(), '')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000702 f.close()
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000703
Guido van Rossume7ba4952007-06-06 23:52:48 +0000704class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000705 def test_urandom(self):
706 try:
707 self.assertEqual(len(os.urandom(1)), 1)
708 self.assertEqual(len(os.urandom(10)), 10)
709 self.assertEqual(len(os.urandom(100)), 100)
710 self.assertEqual(len(os.urandom(1000)), 1000)
711 except NotImplementedError:
712 pass
713
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000714@contextlib.contextmanager
715def _execvpe_mockup(defpath=None):
716 """
717 Stubs out execv and execve functions when used as context manager.
718 Records exec calls. The mock execv and execve functions always raise an
719 exception as they would normally never return.
720 """
721 # A list of tuples containing (function name, first arg, args)
722 # of calls to execv or execve that have been made.
723 calls = []
724
725 def mock_execv(name, *args):
726 calls.append(('execv', name, args))
727 raise RuntimeError("execv called")
728
729 def mock_execve(name, *args):
730 calls.append(('execve', name, args))
731 raise OSError(errno.ENOTDIR, "execve called")
732
733 try:
734 orig_execv = os.execv
735 orig_execve = os.execve
736 orig_defpath = os.defpath
737 os.execv = mock_execv
738 os.execve = mock_execve
739 if defpath is not None:
740 os.defpath = defpath
741 yield calls
742 finally:
743 os.execv = orig_execv
744 os.execve = orig_execve
745 os.defpath = orig_defpath
746
Guido van Rossume7ba4952007-06-06 23:52:48 +0000747class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000748 @unittest.skipIf(USING_LINUXTHREADS,
749 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000750 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000751 self.assertRaises(OSError, os.execvpe, 'no such app-',
752 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000753
Thomas Heller6790d602007-08-30 17:15:14 +0000754 def test_execvpe_with_bad_arglist(self):
755 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
756
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000757 @unittest.skipUnless(hasattr(os, '_execvpe'),
758 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000759 def _test_internal_execvpe(self, test_type):
760 program_path = os.sep + 'absolutepath'
761 if test_type is bytes:
762 program = b'executable'
763 fullpath = os.path.join(os.fsencode(program_path), program)
764 native_fullpath = fullpath
765 arguments = [b'progname', 'arg1', 'arg2']
766 else:
767 program = 'executable'
768 arguments = ['progname', 'arg1', 'arg2']
769 fullpath = os.path.join(program_path, program)
770 if os.name != "nt":
771 native_fullpath = os.fsencode(fullpath)
772 else:
773 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000774 env = {'spam': 'beans'}
775
Victor Stinnerb745a742010-05-18 17:17:23 +0000776 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000777 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000778 self.assertRaises(RuntimeError,
779 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000780 self.assertEqual(len(calls), 1)
781 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
782
Victor Stinnerb745a742010-05-18 17:17:23 +0000783 # test os._execvpe() with a relative path:
784 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000785 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000786 self.assertRaises(OSError,
787 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000788 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000789 self.assertSequenceEqual(calls[0],
790 ('execve', native_fullpath, (arguments, env)))
791
792 # test os._execvpe() with a relative path:
793 # os.get_exec_path() reads the 'PATH' variable
794 with _execvpe_mockup() as calls:
795 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000796 if test_type is bytes:
797 env_path[b'PATH'] = program_path
798 else:
799 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000800 self.assertRaises(OSError,
801 os._execvpe, program, arguments, env=env_path)
802 self.assertEqual(len(calls), 1)
803 self.assertSequenceEqual(calls[0],
804 ('execve', native_fullpath, (arguments, env_path)))
805
806 def test_internal_execvpe_str(self):
807 self._test_internal_execvpe(str)
808 if os.name != "nt":
809 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000810
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000811
Thomas Wouters477c8d52006-05-27 19:21:47 +0000812class Win32ErrorTests(unittest.TestCase):
813 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000814 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000815
816 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000817 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000818
819 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000820 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000821
822 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000823 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000824 try:
825 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
826 finally:
827 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000828 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000829
830 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000831 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000832
Thomas Wouters477c8d52006-05-27 19:21:47 +0000833 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000834 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000835
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000836class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000837 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000838 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
839 #singles.append("close")
840 #We omit close because it doesn'r raise an exception on some platforms
841 def get_single(f):
842 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000843 if hasattr(os, f):
844 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000845 return helper
846 for f in singles:
847 locals()["test_"+f] = get_single(f)
848
Benjamin Peterson7522c742009-01-19 21:00:09 +0000849 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000850 try:
851 f(support.make_bad_fd(), *args)
852 except OSError as e:
853 self.assertEqual(e.errno, errno.EBADF)
854 else:
855 self.fail("%r didn't raise a OSError with a bad file descriptor"
856 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000857
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000858 def test_isatty(self):
859 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000860 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000861
862 def test_closerange(self):
863 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000864 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000865 # Make sure none of the descriptors we are about to close are
866 # currently valid (issue 6542).
867 for i in range(10):
868 try: os.fstat(fd+i)
869 except OSError:
870 pass
871 else:
872 break
873 if i < 2:
874 raise unittest.SkipTest(
875 "Unable to acquire a range of invalid file descriptors")
876 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000877
878 def test_dup2(self):
879 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000880 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000881
882 def test_fchmod(self):
883 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000884 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000885
886 def test_fchown(self):
887 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000888 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000889
890 def test_fpathconf(self):
891 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000892 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000893
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000894 def test_ftruncate(self):
895 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000896 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000897
898 def test_lseek(self):
899 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000900 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000901
902 def test_read(self):
903 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000904 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000905
906 def test_tcsetpgrpt(self):
907 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000908 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000909
910 def test_write(self):
911 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000912 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000913
Brian Curtin1b9df392010-11-24 20:24:31 +0000914
915class LinkTests(unittest.TestCase):
916 def setUp(self):
917 self.file1 = support.TESTFN
918 self.file2 = os.path.join(support.TESTFN + "2")
919
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000920 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000921 for file in (self.file1, self.file2):
922 if os.path.exists(file):
923 os.unlink(file)
924
Brian Curtin1b9df392010-11-24 20:24:31 +0000925 def _test_link(self, file1, file2):
926 with open(file1, "w") as f1:
927 f1.write("test")
928
929 os.link(file1, file2)
930 with open(file1, "r") as f1, open(file2, "r") as f2:
931 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
932
933 def test_link(self):
934 self._test_link(self.file1, self.file2)
935
936 def test_link_bytes(self):
937 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
938 bytes(self.file2, sys.getfilesystemencoding()))
939
Brian Curtinf498b752010-11-30 15:54:04 +0000940 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000941 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000942 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000943 except UnicodeError:
944 raise unittest.SkipTest("Unable to encode for this platform.")
945
Brian Curtinf498b752010-11-30 15:54:04 +0000946 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000947 self.file2 = self.file1 + "2"
948 self._test_link(self.file1, self.file2)
949
Thomas Wouters477c8d52006-05-27 19:21:47 +0000950if sys.platform != 'win32':
951 class Win32ErrorTests(unittest.TestCase):
952 pass
953
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000954 class PosixUidGidTests(unittest.TestCase):
955 if hasattr(os, 'setuid'):
956 def test_setuid(self):
957 if os.getuid() != 0:
958 self.assertRaises(os.error, os.setuid, 0)
959 self.assertRaises(OverflowError, os.setuid, 1<<32)
960
961 if hasattr(os, 'setgid'):
962 def test_setgid(self):
963 if os.getuid() != 0:
964 self.assertRaises(os.error, os.setgid, 0)
965 self.assertRaises(OverflowError, os.setgid, 1<<32)
966
967 if hasattr(os, 'seteuid'):
968 def test_seteuid(self):
969 if os.getuid() != 0:
970 self.assertRaises(os.error, os.seteuid, 0)
971 self.assertRaises(OverflowError, os.seteuid, 1<<32)
972
973 if hasattr(os, 'setegid'):
974 def test_setegid(self):
975 if os.getuid() != 0:
976 self.assertRaises(os.error, os.setegid, 0)
977 self.assertRaises(OverflowError, os.setegid, 1<<32)
978
979 if hasattr(os, 'setreuid'):
980 def test_setreuid(self):
981 if os.getuid() != 0:
982 self.assertRaises(os.error, os.setreuid, 0, 0)
983 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
984 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000985
986 def test_setreuid_neg1(self):
987 # Needs to accept -1. We run this in a subprocess to avoid
988 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000989 subprocess.check_call([
990 sys.executable, '-c',
991 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000992
993 if hasattr(os, 'setregid'):
994 def test_setregid(self):
995 if os.getuid() != 0:
996 self.assertRaises(os.error, os.setregid, 0, 0)
997 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
998 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000999
1000 def test_setregid_neg1(self):
1001 # Needs to accept -1. We run this in a subprocess to avoid
1002 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001003 subprocess.check_call([
1004 sys.executable, '-c',
1005 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001006
1007 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001008 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001009 if support.TESTFN_UNENCODABLE:
1010 self.dir = support.TESTFN_UNENCODABLE
1011 else:
1012 self.dir = support.TESTFN
1013 self.bdir = os.fsencode(self.dir)
1014
1015 bytesfn = []
1016 def add_filename(fn):
1017 try:
1018 fn = os.fsencode(fn)
1019 except UnicodeEncodeError:
1020 return
1021 bytesfn.append(fn)
1022 add_filename(support.TESTFN_UNICODE)
1023 if support.TESTFN_UNENCODABLE:
1024 add_filename(support.TESTFN_UNENCODABLE)
1025 if not bytesfn:
1026 self.skipTest("couldn't create any non-ascii filename")
1027
1028 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001029 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001030 try:
1031 for fn in bytesfn:
1032 f = open(os.path.join(self.bdir, fn), "w")
1033 f.close()
Victor Stinnere8d51452010-08-19 01:05:19 +00001034 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001035 if fn in self.unicodefn:
1036 raise ValueError("duplicate filename")
1037 self.unicodefn.add(fn)
1038 except:
1039 shutil.rmtree(self.dir)
1040 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001041
1042 def tearDown(self):
1043 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001044
1045 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001046 expected = self.unicodefn
1047 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001048 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001049
1050 def test_open(self):
1051 for fn in self.unicodefn:
1052 f = open(os.path.join(self.dir, fn))
1053 f.close()
1054
1055 def test_stat(self):
1056 for fn in self.unicodefn:
1057 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001058else:
1059 class PosixUidGidTests(unittest.TestCase):
1060 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001061 class Pep383Tests(unittest.TestCase):
1062 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001063
Brian Curtineb24d742010-04-12 17:16:38 +00001064@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1065class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001066 def _kill(self, sig):
1067 # Start sys.executable as a subprocess and communicate from the
1068 # subprocess to the parent that the interpreter is ready. When it
1069 # becomes ready, send *sig* via os.kill to the subprocess and check
1070 # that the return code is equal to *sig*.
1071 import ctypes
1072 from ctypes import wintypes
1073 import msvcrt
1074
1075 # Since we can't access the contents of the process' stdout until the
1076 # process has exited, use PeekNamedPipe to see what's inside stdout
1077 # without waiting. This is done so we can tell that the interpreter
1078 # is started and running at a point where it could handle a signal.
1079 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1080 PeekNamedPipe.restype = wintypes.BOOL
1081 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1082 ctypes.POINTER(ctypes.c_char), # stdout buf
1083 wintypes.DWORD, # Buffer size
1084 ctypes.POINTER(wintypes.DWORD), # bytes read
1085 ctypes.POINTER(wintypes.DWORD), # bytes avail
1086 ctypes.POINTER(wintypes.DWORD)) # bytes left
1087 msg = "running"
1088 proc = subprocess.Popen([sys.executable, "-c",
1089 "import sys;"
1090 "sys.stdout.write('{}');"
1091 "sys.stdout.flush();"
1092 "input()".format(msg)],
1093 stdout=subprocess.PIPE,
1094 stderr=subprocess.PIPE,
1095 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001096 self.addCleanup(proc.stdout.close)
1097 self.addCleanup(proc.stderr.close)
1098 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001099
1100 count, max = 0, 100
1101 while count < max and proc.poll() is None:
1102 # Create a string buffer to store the result of stdout from the pipe
1103 buf = ctypes.create_string_buffer(len(msg))
1104 # Obtain the text currently in proc.stdout
1105 # Bytes read/avail/left are left as NULL and unused
1106 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1107 buf, ctypes.sizeof(buf), None, None, None)
1108 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1109 if buf.value:
1110 self.assertEqual(msg, buf.value.decode())
1111 break
1112 time.sleep(0.1)
1113 count += 1
1114 else:
1115 self.fail("Did not receive communication from the subprocess")
1116
Brian Curtineb24d742010-04-12 17:16:38 +00001117 os.kill(proc.pid, sig)
1118 self.assertEqual(proc.wait(), sig)
1119
1120 def test_kill_sigterm(self):
1121 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001122 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001123
1124 def test_kill_int(self):
1125 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001126 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001127
1128 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001129 tagname = "test_os_%s" % uuid.uuid1()
1130 m = mmap.mmap(-1, 1, tagname)
1131 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001132 # Run a script which has console control handling enabled.
1133 proc = subprocess.Popen([sys.executable,
1134 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001135 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001136 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1137 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001138 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001139 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001140 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001141 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001142 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001143 count += 1
1144 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001145 # Forcefully kill the process if we weren't able to signal it.
1146 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001147 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001148 os.kill(proc.pid, event)
1149 # proc.send_signal(event) could also be done here.
1150 # Allow time for the signal to be passed and the process to exit.
1151 time.sleep(0.5)
1152 if not proc.poll():
1153 # Forcefully kill the process if we weren't able to signal it.
1154 os.kill(proc.pid, signal.SIGINT)
1155 self.fail("subprocess did not stop on {}".format(name))
1156
1157 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1158 def test_CTRL_C_EVENT(self):
1159 from ctypes import wintypes
1160 import ctypes
1161
1162 # Make a NULL value by creating a pointer with no argument.
1163 NULL = ctypes.POINTER(ctypes.c_int)()
1164 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1165 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1166 wintypes.BOOL)
1167 SetConsoleCtrlHandler.restype = wintypes.BOOL
1168
1169 # Calling this with NULL and FALSE causes the calling process to
1170 # handle CTRL+C, rather than ignore it. This property is inherited
1171 # by subprocesses.
1172 SetConsoleCtrlHandler(NULL, 0)
1173
1174 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1175
1176 def test_CTRL_BREAK_EVENT(self):
1177 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1178
1179
Brian Curtind40e6f72010-07-08 21:39:08 +00001180@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001181@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001182class Win32SymlinkTests(unittest.TestCase):
1183 filelink = 'filelinktest'
1184 filelink_target = os.path.abspath(__file__)
1185 dirlink = 'dirlinktest'
1186 dirlink_target = os.path.dirname(filelink_target)
1187 missing_link = 'missing link'
1188
1189 def setUp(self):
1190 assert os.path.exists(self.dirlink_target)
1191 assert os.path.exists(self.filelink_target)
1192 assert not os.path.exists(self.dirlink)
1193 assert not os.path.exists(self.filelink)
1194 assert not os.path.exists(self.missing_link)
1195
1196 def tearDown(self):
1197 if os.path.exists(self.filelink):
1198 os.remove(self.filelink)
1199 if os.path.exists(self.dirlink):
1200 os.rmdir(self.dirlink)
1201 if os.path.lexists(self.missing_link):
1202 os.remove(self.missing_link)
1203
1204 def test_directory_link(self):
1205 os.symlink(self.dirlink_target, self.dirlink)
1206 self.assertTrue(os.path.exists(self.dirlink))
1207 self.assertTrue(os.path.isdir(self.dirlink))
1208 self.assertTrue(os.path.islink(self.dirlink))
1209 self.check_stat(self.dirlink, self.dirlink_target)
1210
1211 def test_file_link(self):
1212 os.symlink(self.filelink_target, self.filelink)
1213 self.assertTrue(os.path.exists(self.filelink))
1214 self.assertTrue(os.path.isfile(self.filelink))
1215 self.assertTrue(os.path.islink(self.filelink))
1216 self.check_stat(self.filelink, self.filelink_target)
1217
1218 def _create_missing_dir_link(self):
1219 'Create a "directory" link to a non-existent target'
1220 linkname = self.missing_link
1221 if os.path.lexists(linkname):
1222 os.remove(linkname)
1223 target = r'c:\\target does not exist.29r3c740'
1224 assert not os.path.exists(target)
1225 target_is_dir = True
1226 os.symlink(target, linkname, target_is_dir)
1227
1228 def test_remove_directory_link_to_missing_target(self):
1229 self._create_missing_dir_link()
1230 # For compatibility with Unix, os.remove will check the
1231 # directory status and call RemoveDirectory if the symlink
1232 # was created with target_is_dir==True.
1233 os.remove(self.missing_link)
1234
1235 @unittest.skip("currently fails; consider for improvement")
1236 def test_isdir_on_directory_link_to_missing_target(self):
1237 self._create_missing_dir_link()
1238 # consider having isdir return true for directory links
1239 self.assertTrue(os.path.isdir(self.missing_link))
1240
1241 @unittest.skip("currently fails; consider for improvement")
1242 def test_rmdir_on_directory_link_to_missing_target(self):
1243 self._create_missing_dir_link()
1244 # consider allowing rmdir to remove directory links
1245 os.rmdir(self.missing_link)
1246
1247 def check_stat(self, link, target):
1248 self.assertEqual(os.stat(link), os.stat(target))
1249 self.assertNotEqual(os.lstat(link), os.stat(link))
1250
1251
Victor Stinnere8d51452010-08-19 01:05:19 +00001252class FSEncodingTests(unittest.TestCase):
1253 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001254 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1255 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001256
Victor Stinnere8d51452010-08-19 01:05:19 +00001257 def test_identity(self):
1258 # assert fsdecode(fsencode(x)) == x
1259 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1260 try:
1261 bytesfn = os.fsencode(fn)
1262 except UnicodeEncodeError:
1263 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001264 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001265
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001266
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001267class PidTests(unittest.TestCase):
1268 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1269 def test_getppid(self):
1270 p = subprocess.Popen([sys.executable, '-c',
1271 'import os; print(os.getppid())'],
1272 stdout=subprocess.PIPE)
1273 stdout, _ = p.communicate()
1274 # We are the parent of our subprocess
1275 self.assertEqual(int(stdout), os.getpid())
1276
1277
Brian Curtin0151b8e2010-09-24 13:43:43 +00001278# The introduction of this TestCase caused at least two different errors on
1279# *nix buildbots. Temporarily skip this to let the buildbots move along.
1280@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001281@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1282class LoginTests(unittest.TestCase):
1283 def test_getlogin(self):
1284 user_name = os.getlogin()
1285 self.assertNotEqual(len(user_name), 0)
1286
1287
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001288@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1289 "needs os.getpriority and os.setpriority")
1290class ProgramPriorityTests(unittest.TestCase):
1291 """Tests for os.getpriority() and os.setpriority()."""
1292
1293 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001294
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001295 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1296 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1297 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001298 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1299 if base >= 19 and new_prio <= 19:
1300 raise unittest.SkipTest(
1301 "unable to reliably test setpriority at current nice level of %s" % base)
1302 else:
1303 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001304 finally:
1305 try:
1306 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1307 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001308 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001309 raise
1310
1311
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001312class SendfileTestServer(asyncore.dispatcher, threading.Thread):
1313
1314 class Handler(asynchat.async_chat):
1315
1316 def __init__(self, conn):
1317 asynchat.async_chat.__init__(self, conn)
1318 self.in_buffer = []
1319 self.closed = False
1320 self.push(b"220 ready\r\n")
1321
1322 def handle_read(self):
1323 data = self.recv(4096)
1324 self.in_buffer.append(data)
1325
1326 def get_data(self):
1327 return b''.join(self.in_buffer)
1328
1329 def handle_close(self):
1330 self.close()
1331 self.closed = True
1332
1333 def handle_error(self):
1334 raise
1335
1336 def __init__(self, address):
1337 threading.Thread.__init__(self)
1338 asyncore.dispatcher.__init__(self)
1339 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1340 self.bind(address)
1341 self.listen(5)
1342 self.host, self.port = self.socket.getsockname()[:2]
1343 self.handler_instance = None
1344 self._active = False
1345 self._active_lock = threading.Lock()
1346
1347 # --- public API
1348
1349 @property
1350 def running(self):
1351 return self._active
1352
1353 def start(self):
1354 assert not self.running
1355 self.__flag = threading.Event()
1356 threading.Thread.start(self)
1357 self.__flag.wait()
1358
1359 def stop(self):
1360 assert self.running
1361 self._active = False
1362 self.join()
1363
1364 def wait(self):
1365 # wait for handler connection to be closed, then stop the server
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001366 while not getattr(self.handler_instance, "closed", False):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001367 time.sleep(0.001)
1368 self.stop()
1369
1370 # --- internals
1371
1372 def run(self):
1373 self._active = True
1374 self.__flag.set()
1375 while self._active and asyncore.socket_map:
1376 self._active_lock.acquire()
1377 asyncore.loop(timeout=0.001, count=1)
1378 self._active_lock.release()
1379 asyncore.close_all()
1380
1381 def handle_accept(self):
1382 conn, addr = self.accept()
1383 self.handler_instance = self.Handler(conn)
1384
1385 def handle_connect(self):
1386 self.close()
1387 handle_read = handle_connect
1388
1389 def writable(self):
1390 return 0
1391
1392 def handle_error(self):
1393 raise
1394
1395
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001396@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001397@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1398class TestSendfile(unittest.TestCase):
1399
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001400 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001401 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001402 not sys.platform.startswith("solaris") and \
1403 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001404
1405 @classmethod
1406 def setUpClass(cls):
1407 with open(support.TESTFN, "wb") as f:
1408 f.write(cls.DATA)
1409
1410 @classmethod
1411 def tearDownClass(cls):
1412 support.unlink(support.TESTFN)
1413
1414 def setUp(self):
1415 self.server = SendfileTestServer((support.HOST, 0))
1416 self.server.start()
1417 self.client = socket.socket()
1418 self.client.connect((self.server.host, self.server.port))
1419 self.client.settimeout(1)
1420 # synchronize by waiting for "220 ready" response
1421 self.client.recv(1024)
1422 self.sockno = self.client.fileno()
1423 self.file = open(support.TESTFN, 'rb')
1424 self.fileno = self.file.fileno()
1425
1426 def tearDown(self):
1427 self.file.close()
1428 self.client.close()
1429 if self.server.running:
1430 self.server.stop()
1431
1432 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1433 """A higher level wrapper representing how an application is
1434 supposed to use sendfile().
1435 """
1436 while 1:
1437 try:
1438 if self.SUPPORT_HEADERS_TRAILERS:
1439 return os.sendfile(sock, file, offset, nbytes, headers,
1440 trailers)
1441 else:
1442 return os.sendfile(sock, file, offset, nbytes)
1443 except OSError as err:
1444 if err.errno == errno.ECONNRESET:
1445 # disconnected
1446 raise
1447 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1448 # we have to retry send data
1449 continue
1450 else:
1451 raise
1452
1453 def test_send_whole_file(self):
1454 # normal send
1455 total_sent = 0
1456 offset = 0
1457 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001458 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001459 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1460 if sent == 0:
1461 break
1462 offset += sent
1463 total_sent += sent
1464 self.assertTrue(sent <= nbytes)
1465 self.assertEqual(offset, total_sent)
1466
1467 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001468 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001469 self.client.close()
1470 self.server.wait()
1471 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001472 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001473 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001474
1475 def test_send_at_certain_offset(self):
1476 # start sending a file at a certain offset
1477 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001478 offset = len(self.DATA) // 2
1479 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001480 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001481 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001482 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1483 if sent == 0:
1484 break
1485 offset += sent
1486 total_sent += sent
1487 self.assertTrue(sent <= nbytes)
1488
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001489 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001490 self.client.close()
1491 self.server.wait()
1492 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001493 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001494 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001495 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001496 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001497
1498 def test_offset_overflow(self):
1499 # specify an offset > file size
1500 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001501 try:
1502 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1503 except OSError as e:
1504 # Solaris can raise EINVAL if offset >= file length, ignore.
1505 if e.errno != errno.EINVAL:
1506 raise
1507 else:
1508 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001509 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001510 self.client.close()
1511 self.server.wait()
1512 data = self.server.handler_instance.get_data()
1513 self.assertEqual(data, b'')
1514
1515 def test_invalid_offset(self):
1516 with self.assertRaises(OSError) as cm:
1517 os.sendfile(self.sockno, self.fileno, -1, 4096)
1518 self.assertEqual(cm.exception.errno, errno.EINVAL)
1519
1520 # --- headers / trailers tests
1521
1522 if SUPPORT_HEADERS_TRAILERS:
1523
1524 def test_headers(self):
1525 total_sent = 0
1526 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1527 headers=[b"x" * 512])
1528 total_sent += sent
1529 offset = 4096
1530 nbytes = 4096
1531 while 1:
1532 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1533 offset, nbytes)
1534 if sent == 0:
1535 break
1536 total_sent += sent
1537 offset += sent
1538
1539 expected_data = b"x" * 512 + self.DATA
1540 self.assertEqual(total_sent, len(expected_data))
1541 self.client.close()
1542 self.server.wait()
1543 data = self.server.handler_instance.get_data()
1544 self.assertEqual(hash(data), hash(expected_data))
1545
1546 def test_trailers(self):
1547 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001548 with open(TESTFN2, 'wb') as f:
1549 f.write(b"abcde")
1550 with open(TESTFN2, 'rb')as f:
1551 self.addCleanup(os.remove, TESTFN2)
1552 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1553 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001554 self.client.close()
1555 self.server.wait()
1556 data = self.server.handler_instance.get_data()
1557 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001558
1559 if hasattr(os, "SF_NODISKIO"):
1560 def test_flags(self):
1561 try:
1562 os.sendfile(self.sockno, self.fileno, 0, 4096,
1563 flags=os.SF_NODISKIO)
1564 except OSError as err:
1565 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1566 raise
1567
1568
Fred Drake2e2be372001-09-20 21:33:42 +00001569def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001570 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001571 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001572 StatAttributeTests,
1573 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001574 WalkTests,
1575 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001576 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001577 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001578 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001579 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001580 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001581 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001582 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001583 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001584 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001585 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001586 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001587 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001588 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001589 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001590 ProgramPriorityTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001591 )
Fred Drake2e2be372001-09-20 21:33:42 +00001592
1593if __name__ == "__main__":
1594 test_main()