blob: 35aa7fa75edd879bb0a2f20e644bb273e80466b0 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
17import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000018import asyncore
19import asynchat
20import socket
21try:
22 import threading
23except ImportError:
24 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000025
Mark Dickinson7cf03892010-04-16 13:45:35 +000026# Detect whether we're on a Linux system that uses the (now outdated
27# and unmaintained) linuxthreads threading library. There's an issue
28# when combining linuxthreads with a failed execv call: see
29# http://bugs.python.org/issue4970.
Mark Dickinson89589c92010-04-16 13:51:27 +000030if (hasattr(os, "confstr_names") and
31 "CS_GNU_LIBPTHREAD_VERSION" in os.confstr_names):
Mark Dickinson7cf03892010-04-16 13:45:35 +000032 libpthread = os.confstr("CS_GNU_LIBPTHREAD_VERSION")
33 USING_LINUXTHREADS= libpthread.startswith("linuxthreads")
34else:
35 USING_LINUXTHREADS= False
Brian Curtineb24d742010-04-12 17:16:38 +000036
Thomas Wouters0e3f5912006-08-11 14:57:12 +000037# Tests creating TESTFN
38class FileTests(unittest.TestCase):
39 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040 if os.path.exists(support.TESTFN):
41 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000042 tearDown = setUp
43
44 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000045 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000047 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048
Christian Heimesfdab48e2008-01-20 09:06:41 +000049 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000050 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
51 # We must allocate two consecutive file descriptors, otherwise
52 # it will mess up other file descriptors (perhaps even the three
53 # standard ones).
54 second = os.dup(first)
55 try:
56 retries = 0
57 while second != first + 1:
58 os.close(first)
59 retries += 1
60 if retries > 10:
61 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000062 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first, second = second, os.dup(second)
64 finally:
65 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000066 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000067 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000068 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000070 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000071 def test_rename(self):
72 path = support.TESTFN
73 old = sys.getrefcount(path)
74 self.assertRaises(TypeError, os.rename, path, 0)
75 new = sys.getrefcount(path)
76 self.assertEqual(old, new)
77
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000078 def test_read(self):
79 with open(support.TESTFN, "w+b") as fobj:
80 fobj.write(b"spam")
81 fobj.flush()
82 fd = fobj.fileno()
83 os.lseek(fd, 0, 0)
84 s = os.read(fd, 4)
85 self.assertEqual(type(s), bytes)
86 self.assertEqual(s, b"spam")
87
88 def test_write(self):
89 # os.write() accepts bytes- and buffer-like objects but not strings
90 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
91 self.assertRaises(TypeError, os.write, fd, "beans")
92 os.write(fd, b"bacon\n")
93 os.write(fd, bytearray(b"eggs\n"))
94 os.write(fd, memoryview(b"spam\n"))
95 os.close(fd)
96 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000097 self.assertEqual(fobj.read().splitlines(),
98 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099
Victor Stinnere0daff12011-03-20 23:36:35 +0100100 def write_windows_console(self, *args):
101 retcode = subprocess.call(args,
102 # use a new console to not flood the test output
103 creationflags=subprocess.CREATE_NEW_CONSOLE,
104 # use a shell to hide the console window (SW_HIDE)
105 shell=True)
106 self.assertEqual(retcode, 0)
107
108 @unittest.skipUnless(sys.platform == 'win32',
109 'test specific to the Windows console')
110 def test_write_windows_console(self):
111 # Issue #11395: the Windows console returns an error (12: not enough
112 # space error) on writing into stdout if stdout mode is binary and the
113 # length is greater than 66,000 bytes (or less, depending on heap
114 # usage).
115 code = "print('x' * 100000)"
116 self.write_windows_console(sys.executable, "-c", code)
117 self.write_windows_console(sys.executable, "-u", "-c", code)
118
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000119
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000120class TemporaryFileTests(unittest.TestCase):
121 def setUp(self):
122 self.files = []
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000123 os.mkdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000124
125 def tearDown(self):
126 for name in self.files:
127 os.unlink(name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000128 os.rmdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000129
130 def check_tempfile(self, name):
131 # make sure it doesn't already exist:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000132 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000133 "file already exists for temporary file")
134 # make sure we can create the file
135 open(name, "w")
136 self.files.append(name)
137
138 def test_tempnam(self):
139 if not hasattr(os, "tempnam"):
140 return
141 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
142 r"test_os$")
143 self.check_tempfile(os.tempnam())
144
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000145 name = os.tempnam(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000146 self.check_tempfile(name)
147
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000148 name = os.tempnam(support.TESTFN, "pfx")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000149 self.assertTrue(os.path.basename(name)[:3] == "pfx")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000150 self.check_tempfile(name)
151
152 def test_tmpfile(self):
153 if not hasattr(os, "tmpfile"):
154 return
155 # As with test_tmpnam() below, the Windows implementation of tmpfile()
156 # attempts to create a file in the root directory of the current drive.
157 # On Vista and Server 2008, this test will always fail for normal users
158 # as writing to the root directory requires elevated privileges. With
159 # XP and below, the semantics of tmpfile() are the same, but the user
160 # running the test is more likely to have administrative privileges on
161 # their account already. If that's the case, then os.tmpfile() should
162 # work. In order to make this test as useful as possible, rather than
163 # trying to detect Windows versions or whether or not the user has the
164 # right permissions, just try and create a file in the root directory
165 # and see if it raises a 'Permission denied' OSError. If it does, then
166 # test that a subsequent call to os.tmpfile() raises the same error. If
167 # it doesn't, assume we're on XP or below and the user running the test
168 # has administrative privileges, and proceed with the test as normal.
169 if sys.platform == 'win32':
170 name = '\\python_test_os_test_tmpfile.txt'
171 if os.path.exists(name):
172 os.remove(name)
173 try:
174 fp = open(name, 'w')
175 except IOError as first:
176 # open() failed, assert tmpfile() fails in the same way.
177 # Although open() raises an IOError and os.tmpfile() raises an
178 # OSError(), 'args' will be (13, 'Permission denied') in both
179 # cases.
180 try:
181 fp = os.tmpfile()
182 except OSError as second:
183 self.assertEqual(first.args, second.args)
184 else:
185 self.fail("expected os.tmpfile() to raise OSError")
186 return
187 else:
188 # open() worked, therefore, tmpfile() should work. Close our
189 # dummy file and proceed with the test as normal.
190 fp.close()
191 os.remove(name)
192
193 fp = os.tmpfile()
194 fp.write("foobar")
195 fp.seek(0,0)
196 s = fp.read()
197 fp.close()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000198 self.assertTrue(s == "foobar")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000199
200 def test_tmpnam(self):
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000201 if not hasattr(os, "tmpnam"):
202 return
203 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
204 r"test_os$")
205 name = os.tmpnam()
206 if sys.platform in ("win32",):
207 # The Windows tmpnam() seems useless. From the MS docs:
208 #
209 # The character string that tmpnam creates consists of
210 # the path prefix, defined by the entry P_tmpdir in the
211 # file STDIO.H, followed by a sequence consisting of the
212 # digit characters '0' through '9'; the numerical value
213 # of this string is in the range 1 - 65,535. Changing the
214 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not
215 # change the operation of tmpnam.
216 #
217 # The really bizarre part is that, at least under MSVC6,
218 # P_tmpdir is "\\". That is, the path returned refers to
219 # the root of the current drive. That's a terrible place to
220 # put temp files, and, depending on privileges, the user
221 # may not even be able to open a file in the root directory.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000222 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000223 "file already exists for temporary file")
224 else:
225 self.check_tempfile(name)
226
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000227 def fdopen_helper(self, *args):
228 fd = os.open(support.TESTFN, os.O_RDONLY)
229 fp2 = os.fdopen(fd, *args)
230 fp2.close()
231
232 def test_fdopen(self):
233 self.fdopen_helper()
234 self.fdopen_helper('r')
235 self.fdopen_helper('r', 100)
236
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000237# Test attributes on return values from os.*stat* family.
238class StatAttributeTests(unittest.TestCase):
239 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000240 os.mkdir(support.TESTFN)
241 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000243 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000245
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246 def tearDown(self):
247 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000248 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249
Antoine Pitrou38425292010-09-21 18:19:07 +0000250 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000251 if not hasattr(os, "stat"):
252 return
253
254 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000255 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000256
257 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(result[stat.ST_SIZE], 3)
259 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000260
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261 # Make sure all the attributes are there
262 members = dir(result)
263 for name in dir(stat):
264 if name[:3] == 'ST_':
265 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000266 if name.endswith("TIME"):
267 def trunc(x): return int(x)
268 else:
269 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000270 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000272 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273
274 try:
275 result[200]
276 self.fail("No exception thrown")
277 except IndexError:
278 pass
279
280 # Make sure that assignment fails
281 try:
282 result.st_mode = 1
283 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000284 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000285 pass
286
287 try:
288 result.st_rdev = 1
289 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000290 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000291 pass
292
293 try:
294 result.parrot = 1
295 self.fail("No exception thrown")
296 except AttributeError:
297 pass
298
299 # Use the stat_result constructor with a too-short tuple.
300 try:
301 result2 = os.stat_result((10,))
302 self.fail("No exception thrown")
303 except TypeError:
304 pass
305
Ezio Melotti42da6632011-03-15 05:18:48 +0200306 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000307 try:
308 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
309 except TypeError:
310 pass
311
Antoine Pitrou38425292010-09-21 18:19:07 +0000312 def test_stat_attributes(self):
313 self.check_stat_attributes(self.fname)
314
315 def test_stat_attributes_bytes(self):
316 try:
317 fname = self.fname.encode(sys.getfilesystemencoding())
318 except UnicodeEncodeError:
319 self.skipTest("cannot encode %a for the filesystem" % self.fname)
320 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000321
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000322 def test_statvfs_attributes(self):
323 if not hasattr(os, "statvfs"):
324 return
325
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000326 try:
327 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000328 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000329 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000330 if e.errno == errno.ENOSYS:
331 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000332
333 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000334 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000335
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000336 # Make sure all the attributes are there.
337 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
338 'ffree', 'favail', 'flag', 'namemax')
339 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000340 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000341
342 # Make sure that assignment really fails
343 try:
344 result.f_bfree = 1
345 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000346 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000347 pass
348
349 try:
350 result.parrot = 1
351 self.fail("No exception thrown")
352 except AttributeError:
353 pass
354
355 # Use the constructor with a too-short tuple.
356 try:
357 result2 = os.statvfs_result((10,))
358 self.fail("No exception thrown")
359 except TypeError:
360 pass
361
Ezio Melotti42da6632011-03-15 05:18:48 +0200362 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000363 try:
364 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
365 except TypeError:
366 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000367
Thomas Wouters89f507f2006-12-13 04:49:30 +0000368 def test_utime_dir(self):
369 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000370 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000371 # round to int, because some systems may support sub-second
372 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000373 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
374 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000375 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000376
377 # Restrict test to Win32, since there is no guarantee other
378 # systems support centiseconds
379 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000380 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000381 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000382 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000383 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000384 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000385 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000386 return buf.value
387
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000388 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000389 def test_1565150(self):
390 t1 = 1159195039.25
391 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000392 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000393
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000394 def test_large_time(self):
395 t1 = 5000000000 # some day in 2128
396 os.utime(self.fname, (t1, t1))
397 self.assertEqual(os.stat(self.fname).st_mtime, t1)
398
Guido van Rossumd8faa362007-04-27 19:54:29 +0000399 def test_1686475(self):
400 # Verify that an open file can be stat'ed
401 try:
402 os.stat(r"c:\pagefile.sys")
403 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000404 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000405 return
406 self.fail("Could not stat pagefile.sys")
407
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000408from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000409
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000410class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000411 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000412 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000413
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000414 def setUp(self):
415 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000416 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000417 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000418 for key, value in self._reference().items():
419 os.environ[key] = value
420
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000421 def tearDown(self):
422 os.environ.clear()
423 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000424 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000425 os.environb.clear()
426 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000427
Christian Heimes90333392007-11-01 19:08:42 +0000428 def _reference(self):
429 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
430
431 def _empty_mapping(self):
432 os.environ.clear()
433 return os.environ
434
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000435 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000436 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000437 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000438 if os.path.exists("/bin/sh"):
439 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000440 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
441 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000442 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000443
Christian Heimes1a13d592007-11-08 14:16:55 +0000444 def test_os_popen_iter(self):
445 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000446 with os.popen(
447 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
448 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000449 self.assertEqual(next(it), "line1\n")
450 self.assertEqual(next(it), "line2\n")
451 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000452 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000453
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000454 # Verify environ keys and values from the OS are of the
455 # correct str type.
456 def test_keyvalue_types(self):
457 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000458 self.assertEqual(type(key), str)
459 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000460
Christian Heimes90333392007-11-01 19:08:42 +0000461 def test_items(self):
462 for key, value in self._reference().items():
463 self.assertEqual(os.environ.get(key), value)
464
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000465 # Issue 7310
466 def test___repr__(self):
467 """Check that the repr() of os.environ looks like environ({...})."""
468 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000469 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
470 '{!r}: {!r}'.format(key, value)
471 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000472
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000473 def test_get_exec_path(self):
474 defpath_list = os.defpath.split(os.pathsep)
475 test_path = ['/monty', '/python', '', '/flying/circus']
476 test_env = {'PATH': os.pathsep.join(test_path)}
477
478 saved_environ = os.environ
479 try:
480 os.environ = dict(test_env)
481 # Test that defaulting to os.environ works.
482 self.assertSequenceEqual(test_path, os.get_exec_path())
483 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
484 finally:
485 os.environ = saved_environ
486
487 # No PATH environment variable
488 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
489 # Empty PATH environment variable
490 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
491 # Supplied PATH environment variable
492 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
493
Victor Stinnerb745a742010-05-18 17:17:23 +0000494 if os.supports_bytes_environ:
495 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000496 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000497 # ignore BytesWarning warning
498 with warnings.catch_warnings(record=True):
499 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000500 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000501 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000502 pass
503 else:
504 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000505
506 # bytes key and/or value
507 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
508 ['abc'])
509 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
510 ['abc'])
511 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
512 ['abc'])
513
514 @unittest.skipUnless(os.supports_bytes_environ,
515 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000516 def test_environb(self):
517 # os.environ -> os.environb
518 value = 'euro\u20ac'
519 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000520 value_bytes = value.encode(sys.getfilesystemencoding(),
521 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000522 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000523 msg = "U+20AC character is not encodable to %s" % (
524 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000525 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000526 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000527 self.assertEqual(os.environ['unicode'], value)
528 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000529
530 # os.environb -> os.environ
531 value = b'\xff'
532 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000533 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000534 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000535 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000536
Tim Petersc4e09402003-04-25 07:11:48 +0000537class WalkTests(unittest.TestCase):
538 """Tests for os.walk()."""
539
540 def test_traversal(self):
541 import os
542 from os.path import join
543
544 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000545 # TESTFN/
546 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000547 # tmp1
548 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000549 # tmp2
550 # SUB11/ no kids
551 # SUB2/ a file kid and a dirsymlink kid
552 # tmp3
553 # link/ a symlink to TESTFN.2
554 # TEST2/
555 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000556 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000557 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000558 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000559 sub2_path = join(walk_path, "SUB2")
560 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000561 tmp2_path = join(sub1_path, "tmp2")
562 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000563 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000564 t2_path = join(support.TESTFN, "TEST2")
565 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000566
567 # Create stuff.
568 os.makedirs(sub11_path)
569 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000570 os.makedirs(t2_path)
571 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000572 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000573 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
574 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000575 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000576 os.symlink(os.path.abspath(t2_path), link_path)
577 sub2_tree = (sub2_path, ["link"], ["tmp3"])
578 else:
579 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000580
581 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000582 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000583 self.assertEqual(len(all), 4)
584 # We can't know which order SUB1 and SUB2 will appear in.
585 # Not flipped: TESTFN, SUB1, SUB11, SUB2
586 # flipped: TESTFN, SUB2, SUB1, SUB11
587 flipped = all[0][1][0] != "SUB1"
588 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000589 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000590 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
591 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000592 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000593
594 # Prune the search.
595 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000596 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000597 all.append((root, dirs, files))
598 # Don't descend into SUB1.
599 if 'SUB1' in dirs:
600 # Note that this also mutates the dirs we appended to all!
601 dirs.remove('SUB1')
602 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000603 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
604 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000605
606 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000607 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000608 self.assertEqual(len(all), 4)
609 # We can't know which order SUB1 and SUB2 will appear in.
610 # Not flipped: SUB11, SUB1, SUB2, TESTFN
611 # flipped: SUB2, SUB11, SUB1, TESTFN
612 flipped = all[3][1][0] != "SUB1"
613 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000614 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000615 self.assertEqual(all[flipped], (sub11_path, [], []))
616 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000617 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000618
Brian Curtin3b4499c2010-12-28 14:31:47 +0000619 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000620 # Walk, following symlinks.
621 for root, dirs, files in os.walk(walk_path, followlinks=True):
622 if root == link_path:
623 self.assertEqual(dirs, [])
624 self.assertEqual(files, ["tmp4"])
625 break
626 else:
627 self.fail("Didn't follow symlink with followlinks=True")
628
629 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000630 # Tear everything down. This is a decent use for bottom-up on
631 # Windows, which doesn't have a recursive delete command. The
632 # (not so) subtlety is that rmdir will fail unless the dir's
633 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000634 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000635 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000636 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000637 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000638 dirname = os.path.join(root, name)
639 if not os.path.islink(dirname):
640 os.rmdir(dirname)
641 else:
642 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000643 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000644
Guido van Rossume7ba4952007-06-06 23:52:48 +0000645class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000646 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000647 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000648
649 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000650 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000651 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
652 os.makedirs(path) # Should work
653 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
654 os.makedirs(path)
655
656 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000657 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000658 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
659 os.makedirs(path)
660 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
661 'dir5', 'dir6')
662 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000663
Terry Reedy5a22b652010-12-02 07:05:56 +0000664 def test_exist_ok_existing_directory(self):
665 path = os.path.join(support.TESTFN, 'dir1')
666 mode = 0o777
667 old_mask = os.umask(0o022)
668 os.makedirs(path, mode)
669 self.assertRaises(OSError, os.makedirs, path, mode)
670 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
671 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
672 os.makedirs(path, mode=mode, exist_ok=True)
673 os.umask(old_mask)
674
675 def test_exist_ok_existing_regular_file(self):
676 base = support.TESTFN
677 path = os.path.join(support.TESTFN, 'dir1')
678 f = open(path, 'w')
679 f.write('abc')
680 f.close()
681 self.assertRaises(OSError, os.makedirs, path)
682 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
683 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
684 os.remove(path)
685
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000686 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000687 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000688 'dir4', 'dir5', 'dir6')
689 # If the tests failed, the bottom-most directory ('../dir6')
690 # may not have been created, so we look for the outermost directory
691 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000692 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000693 path = os.path.dirname(path)
694
695 os.removedirs(path)
696
Guido van Rossume7ba4952007-06-06 23:52:48 +0000697class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000698 def test_devnull(self):
Alex Martelli01c77c62006-08-24 02:58:11 +0000699 f = open(os.devnull, 'w')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000700 f.write('hello')
701 f.close()
Alex Martelli01c77c62006-08-24 02:58:11 +0000702 f = open(os.devnull, 'r')
Tim Peters4182cfd2004-06-08 20:34:34 +0000703 self.assertEqual(f.read(), '')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000704 f.close()
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000705
Guido van Rossume7ba4952007-06-06 23:52:48 +0000706class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000707 def test_urandom(self):
708 try:
709 self.assertEqual(len(os.urandom(1)), 1)
710 self.assertEqual(len(os.urandom(10)), 10)
711 self.assertEqual(len(os.urandom(100)), 100)
712 self.assertEqual(len(os.urandom(1000)), 1000)
713 except NotImplementedError:
714 pass
715
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000716@contextlib.contextmanager
717def _execvpe_mockup(defpath=None):
718 """
719 Stubs out execv and execve functions when used as context manager.
720 Records exec calls. The mock execv and execve functions always raise an
721 exception as they would normally never return.
722 """
723 # A list of tuples containing (function name, first arg, args)
724 # of calls to execv or execve that have been made.
725 calls = []
726
727 def mock_execv(name, *args):
728 calls.append(('execv', name, args))
729 raise RuntimeError("execv called")
730
731 def mock_execve(name, *args):
732 calls.append(('execve', name, args))
733 raise OSError(errno.ENOTDIR, "execve called")
734
735 try:
736 orig_execv = os.execv
737 orig_execve = os.execve
738 orig_defpath = os.defpath
739 os.execv = mock_execv
740 os.execve = mock_execve
741 if defpath is not None:
742 os.defpath = defpath
743 yield calls
744 finally:
745 os.execv = orig_execv
746 os.execve = orig_execve
747 os.defpath = orig_defpath
748
Guido van Rossume7ba4952007-06-06 23:52:48 +0000749class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000750 @unittest.skipIf(USING_LINUXTHREADS,
751 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000752 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000753 self.assertRaises(OSError, os.execvpe, 'no such app-',
754 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000755
Thomas Heller6790d602007-08-30 17:15:14 +0000756 def test_execvpe_with_bad_arglist(self):
757 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
758
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000759 @unittest.skipUnless(hasattr(os, '_execvpe'),
760 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000761 def _test_internal_execvpe(self, test_type):
762 program_path = os.sep + 'absolutepath'
763 if test_type is bytes:
764 program = b'executable'
765 fullpath = os.path.join(os.fsencode(program_path), program)
766 native_fullpath = fullpath
767 arguments = [b'progname', 'arg1', 'arg2']
768 else:
769 program = 'executable'
770 arguments = ['progname', 'arg1', 'arg2']
771 fullpath = os.path.join(program_path, program)
772 if os.name != "nt":
773 native_fullpath = os.fsencode(fullpath)
774 else:
775 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000776 env = {'spam': 'beans'}
777
Victor Stinnerb745a742010-05-18 17:17:23 +0000778 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000779 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000780 self.assertRaises(RuntimeError,
781 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000782 self.assertEqual(len(calls), 1)
783 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
784
Victor Stinnerb745a742010-05-18 17:17:23 +0000785 # test os._execvpe() with a relative path:
786 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000787 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000788 self.assertRaises(OSError,
789 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000790 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000791 self.assertSequenceEqual(calls[0],
792 ('execve', native_fullpath, (arguments, env)))
793
794 # test os._execvpe() with a relative path:
795 # os.get_exec_path() reads the 'PATH' variable
796 with _execvpe_mockup() as calls:
797 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000798 if test_type is bytes:
799 env_path[b'PATH'] = program_path
800 else:
801 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000802 self.assertRaises(OSError,
803 os._execvpe, program, arguments, env=env_path)
804 self.assertEqual(len(calls), 1)
805 self.assertSequenceEqual(calls[0],
806 ('execve', native_fullpath, (arguments, env_path)))
807
808 def test_internal_execvpe_str(self):
809 self._test_internal_execvpe(str)
810 if os.name != "nt":
811 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000812
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000813
Thomas Wouters477c8d52006-05-27 19:21:47 +0000814class Win32ErrorTests(unittest.TestCase):
815 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000816 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000817
818 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000819 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000820
821 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000822 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000823
824 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000825 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000826 try:
827 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
828 finally:
829 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000830 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000831
832 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000833 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000834
Thomas Wouters477c8d52006-05-27 19:21:47 +0000835 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000836 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000837
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000838class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000839 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000840 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
841 #singles.append("close")
842 #We omit close because it doesn'r raise an exception on some platforms
843 def get_single(f):
844 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000845 if hasattr(os, f):
846 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000847 return helper
848 for f in singles:
849 locals()["test_"+f] = get_single(f)
850
Benjamin Peterson7522c742009-01-19 21:00:09 +0000851 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000852 try:
853 f(support.make_bad_fd(), *args)
854 except OSError as e:
855 self.assertEqual(e.errno, errno.EBADF)
856 else:
857 self.fail("%r didn't raise a OSError with a bad file descriptor"
858 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000859
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000860 def test_isatty(self):
861 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000862 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000863
864 def test_closerange(self):
865 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000866 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000867 # Make sure none of the descriptors we are about to close are
868 # currently valid (issue 6542).
869 for i in range(10):
870 try: os.fstat(fd+i)
871 except OSError:
872 pass
873 else:
874 break
875 if i < 2:
876 raise unittest.SkipTest(
877 "Unable to acquire a range of invalid file descriptors")
878 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000879
880 def test_dup2(self):
881 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000882 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000883
884 def test_fchmod(self):
885 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000886 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000887
888 def test_fchown(self):
889 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000890 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000891
892 def test_fpathconf(self):
893 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000894 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000895
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000896 def test_ftruncate(self):
897 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000898 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000899
900 def test_lseek(self):
901 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000902 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000903
904 def test_read(self):
905 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000906 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000907
908 def test_tcsetpgrpt(self):
909 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000910 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000911
912 def test_write(self):
913 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000914 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000915
Brian Curtin1b9df392010-11-24 20:24:31 +0000916
917class LinkTests(unittest.TestCase):
918 def setUp(self):
919 self.file1 = support.TESTFN
920 self.file2 = os.path.join(support.TESTFN + "2")
921
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000922 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000923 for file in (self.file1, self.file2):
924 if os.path.exists(file):
925 os.unlink(file)
926
Brian Curtin1b9df392010-11-24 20:24:31 +0000927 def _test_link(self, file1, file2):
928 with open(file1, "w") as f1:
929 f1.write("test")
930
931 os.link(file1, file2)
932 with open(file1, "r") as f1, open(file2, "r") as f2:
933 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
934
935 def test_link(self):
936 self._test_link(self.file1, self.file2)
937
938 def test_link_bytes(self):
939 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
940 bytes(self.file2, sys.getfilesystemencoding()))
941
Brian Curtinf498b752010-11-30 15:54:04 +0000942 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000943 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000944 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000945 except UnicodeError:
946 raise unittest.SkipTest("Unable to encode for this platform.")
947
Brian Curtinf498b752010-11-30 15:54:04 +0000948 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000949 self.file2 = self.file1 + "2"
950 self._test_link(self.file1, self.file2)
951
Thomas Wouters477c8d52006-05-27 19:21:47 +0000952if sys.platform != 'win32':
953 class Win32ErrorTests(unittest.TestCase):
954 pass
955
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000956 class PosixUidGidTests(unittest.TestCase):
957 if hasattr(os, 'setuid'):
958 def test_setuid(self):
959 if os.getuid() != 0:
960 self.assertRaises(os.error, os.setuid, 0)
961 self.assertRaises(OverflowError, os.setuid, 1<<32)
962
963 if hasattr(os, 'setgid'):
964 def test_setgid(self):
965 if os.getuid() != 0:
966 self.assertRaises(os.error, os.setgid, 0)
967 self.assertRaises(OverflowError, os.setgid, 1<<32)
968
969 if hasattr(os, 'seteuid'):
970 def test_seteuid(self):
971 if os.getuid() != 0:
972 self.assertRaises(os.error, os.seteuid, 0)
973 self.assertRaises(OverflowError, os.seteuid, 1<<32)
974
975 if hasattr(os, 'setegid'):
976 def test_setegid(self):
977 if os.getuid() != 0:
978 self.assertRaises(os.error, os.setegid, 0)
979 self.assertRaises(OverflowError, os.setegid, 1<<32)
980
981 if hasattr(os, 'setreuid'):
982 def test_setreuid(self):
983 if os.getuid() != 0:
984 self.assertRaises(os.error, os.setreuid, 0, 0)
985 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
986 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000987
988 def test_setreuid_neg1(self):
989 # Needs to accept -1. We run this in a subprocess to avoid
990 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000991 subprocess.check_call([
992 sys.executable, '-c',
993 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000994
995 if hasattr(os, 'setregid'):
996 def test_setregid(self):
997 if os.getuid() != 0:
998 self.assertRaises(os.error, os.setregid, 0, 0)
999 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1000 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001001
1002 def test_setregid_neg1(self):
1003 # Needs to accept -1. We run this in a subprocess to avoid
1004 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001005 subprocess.check_call([
1006 sys.executable, '-c',
1007 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001008
1009 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001010 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001011 if support.TESTFN_UNENCODABLE:
1012 self.dir = support.TESTFN_UNENCODABLE
1013 else:
1014 self.dir = support.TESTFN
1015 self.bdir = os.fsencode(self.dir)
1016
1017 bytesfn = []
1018 def add_filename(fn):
1019 try:
1020 fn = os.fsencode(fn)
1021 except UnicodeEncodeError:
1022 return
1023 bytesfn.append(fn)
1024 add_filename(support.TESTFN_UNICODE)
1025 if support.TESTFN_UNENCODABLE:
1026 add_filename(support.TESTFN_UNENCODABLE)
1027 if not bytesfn:
1028 self.skipTest("couldn't create any non-ascii filename")
1029
1030 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001031 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001032 try:
1033 for fn in bytesfn:
1034 f = open(os.path.join(self.bdir, fn), "w")
1035 f.close()
Victor Stinnere8d51452010-08-19 01:05:19 +00001036 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001037 if fn in self.unicodefn:
1038 raise ValueError("duplicate filename")
1039 self.unicodefn.add(fn)
1040 except:
1041 shutil.rmtree(self.dir)
1042 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001043
1044 def tearDown(self):
1045 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001046
1047 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001048 expected = self.unicodefn
1049 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001050 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001051
1052 def test_open(self):
1053 for fn in self.unicodefn:
1054 f = open(os.path.join(self.dir, fn))
1055 f.close()
1056
1057 def test_stat(self):
1058 for fn in self.unicodefn:
1059 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001060else:
1061 class PosixUidGidTests(unittest.TestCase):
1062 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001063 class Pep383Tests(unittest.TestCase):
1064 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001065
Brian Curtineb24d742010-04-12 17:16:38 +00001066@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1067class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001068 def _kill(self, sig):
1069 # Start sys.executable as a subprocess and communicate from the
1070 # subprocess to the parent that the interpreter is ready. When it
1071 # becomes ready, send *sig* via os.kill to the subprocess and check
1072 # that the return code is equal to *sig*.
1073 import ctypes
1074 from ctypes import wintypes
1075 import msvcrt
1076
1077 # Since we can't access the contents of the process' stdout until the
1078 # process has exited, use PeekNamedPipe to see what's inside stdout
1079 # without waiting. This is done so we can tell that the interpreter
1080 # is started and running at a point where it could handle a signal.
1081 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1082 PeekNamedPipe.restype = wintypes.BOOL
1083 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1084 ctypes.POINTER(ctypes.c_char), # stdout buf
1085 wintypes.DWORD, # Buffer size
1086 ctypes.POINTER(wintypes.DWORD), # bytes read
1087 ctypes.POINTER(wintypes.DWORD), # bytes avail
1088 ctypes.POINTER(wintypes.DWORD)) # bytes left
1089 msg = "running"
1090 proc = subprocess.Popen([sys.executable, "-c",
1091 "import sys;"
1092 "sys.stdout.write('{}');"
1093 "sys.stdout.flush();"
1094 "input()".format(msg)],
1095 stdout=subprocess.PIPE,
1096 stderr=subprocess.PIPE,
1097 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001098 self.addCleanup(proc.stdout.close)
1099 self.addCleanup(proc.stderr.close)
1100 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001101
1102 count, max = 0, 100
1103 while count < max and proc.poll() is None:
1104 # Create a string buffer to store the result of stdout from the pipe
1105 buf = ctypes.create_string_buffer(len(msg))
1106 # Obtain the text currently in proc.stdout
1107 # Bytes read/avail/left are left as NULL and unused
1108 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1109 buf, ctypes.sizeof(buf), None, None, None)
1110 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1111 if buf.value:
1112 self.assertEqual(msg, buf.value.decode())
1113 break
1114 time.sleep(0.1)
1115 count += 1
1116 else:
1117 self.fail("Did not receive communication from the subprocess")
1118
Brian Curtineb24d742010-04-12 17:16:38 +00001119 os.kill(proc.pid, sig)
1120 self.assertEqual(proc.wait(), sig)
1121
1122 def test_kill_sigterm(self):
1123 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001124 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001125
1126 def test_kill_int(self):
1127 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001128 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001129
1130 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001131 tagname = "test_os_%s" % uuid.uuid1()
1132 m = mmap.mmap(-1, 1, tagname)
1133 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001134 # Run a script which has console control handling enabled.
1135 proc = subprocess.Popen([sys.executable,
1136 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001137 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001138 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1139 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001140 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001141 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001142 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001143 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001144 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001145 count += 1
1146 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001147 # Forcefully kill the process if we weren't able to signal it.
1148 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001149 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001150 os.kill(proc.pid, event)
1151 # proc.send_signal(event) could also be done here.
1152 # Allow time for the signal to be passed and the process to exit.
1153 time.sleep(0.5)
1154 if not proc.poll():
1155 # Forcefully kill the process if we weren't able to signal it.
1156 os.kill(proc.pid, signal.SIGINT)
1157 self.fail("subprocess did not stop on {}".format(name))
1158
1159 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1160 def test_CTRL_C_EVENT(self):
1161 from ctypes import wintypes
1162 import ctypes
1163
1164 # Make a NULL value by creating a pointer with no argument.
1165 NULL = ctypes.POINTER(ctypes.c_int)()
1166 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1167 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1168 wintypes.BOOL)
1169 SetConsoleCtrlHandler.restype = wintypes.BOOL
1170
1171 # Calling this with NULL and FALSE causes the calling process to
1172 # handle CTRL+C, rather than ignore it. This property is inherited
1173 # by subprocesses.
1174 SetConsoleCtrlHandler(NULL, 0)
1175
1176 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1177
1178 def test_CTRL_BREAK_EVENT(self):
1179 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1180
1181
Brian Curtind40e6f72010-07-08 21:39:08 +00001182@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001183@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001184class Win32SymlinkTests(unittest.TestCase):
1185 filelink = 'filelinktest'
1186 filelink_target = os.path.abspath(__file__)
1187 dirlink = 'dirlinktest'
1188 dirlink_target = os.path.dirname(filelink_target)
1189 missing_link = 'missing link'
1190
1191 def setUp(self):
1192 assert os.path.exists(self.dirlink_target)
1193 assert os.path.exists(self.filelink_target)
1194 assert not os.path.exists(self.dirlink)
1195 assert not os.path.exists(self.filelink)
1196 assert not os.path.exists(self.missing_link)
1197
1198 def tearDown(self):
1199 if os.path.exists(self.filelink):
1200 os.remove(self.filelink)
1201 if os.path.exists(self.dirlink):
1202 os.rmdir(self.dirlink)
1203 if os.path.lexists(self.missing_link):
1204 os.remove(self.missing_link)
1205
1206 def test_directory_link(self):
1207 os.symlink(self.dirlink_target, self.dirlink)
1208 self.assertTrue(os.path.exists(self.dirlink))
1209 self.assertTrue(os.path.isdir(self.dirlink))
1210 self.assertTrue(os.path.islink(self.dirlink))
1211 self.check_stat(self.dirlink, self.dirlink_target)
1212
1213 def test_file_link(self):
1214 os.symlink(self.filelink_target, self.filelink)
1215 self.assertTrue(os.path.exists(self.filelink))
1216 self.assertTrue(os.path.isfile(self.filelink))
1217 self.assertTrue(os.path.islink(self.filelink))
1218 self.check_stat(self.filelink, self.filelink_target)
1219
1220 def _create_missing_dir_link(self):
1221 'Create a "directory" link to a non-existent target'
1222 linkname = self.missing_link
1223 if os.path.lexists(linkname):
1224 os.remove(linkname)
1225 target = r'c:\\target does not exist.29r3c740'
1226 assert not os.path.exists(target)
1227 target_is_dir = True
1228 os.symlink(target, linkname, target_is_dir)
1229
1230 def test_remove_directory_link_to_missing_target(self):
1231 self._create_missing_dir_link()
1232 # For compatibility with Unix, os.remove will check the
1233 # directory status and call RemoveDirectory if the symlink
1234 # was created with target_is_dir==True.
1235 os.remove(self.missing_link)
1236
1237 @unittest.skip("currently fails; consider for improvement")
1238 def test_isdir_on_directory_link_to_missing_target(self):
1239 self._create_missing_dir_link()
1240 # consider having isdir return true for directory links
1241 self.assertTrue(os.path.isdir(self.missing_link))
1242
1243 @unittest.skip("currently fails; consider for improvement")
1244 def test_rmdir_on_directory_link_to_missing_target(self):
1245 self._create_missing_dir_link()
1246 # consider allowing rmdir to remove directory links
1247 os.rmdir(self.missing_link)
1248
1249 def check_stat(self, link, target):
1250 self.assertEqual(os.stat(link), os.stat(target))
1251 self.assertNotEqual(os.lstat(link), os.stat(link))
1252
1253
Victor Stinnere8d51452010-08-19 01:05:19 +00001254class FSEncodingTests(unittest.TestCase):
1255 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001256 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1257 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001258
Victor Stinnere8d51452010-08-19 01:05:19 +00001259 def test_identity(self):
1260 # assert fsdecode(fsencode(x)) == x
1261 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1262 try:
1263 bytesfn = os.fsencode(fn)
1264 except UnicodeEncodeError:
1265 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001266 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001267
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001268
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001269class PidTests(unittest.TestCase):
1270 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1271 def test_getppid(self):
1272 p = subprocess.Popen([sys.executable, '-c',
1273 'import os; print(os.getppid())'],
1274 stdout=subprocess.PIPE)
1275 stdout, _ = p.communicate()
1276 # We are the parent of our subprocess
1277 self.assertEqual(int(stdout), os.getpid())
1278
1279
Brian Curtin0151b8e2010-09-24 13:43:43 +00001280# The introduction of this TestCase caused at least two different errors on
1281# *nix buildbots. Temporarily skip this to let the buildbots move along.
1282@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001283@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1284class LoginTests(unittest.TestCase):
1285 def test_getlogin(self):
1286 user_name = os.getlogin()
1287 self.assertNotEqual(len(user_name), 0)
1288
1289
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001290@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1291 "needs os.getpriority and os.setpriority")
1292class ProgramPriorityTests(unittest.TestCase):
1293 """Tests for os.getpriority() and os.setpriority()."""
1294
1295 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001296
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001297 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1298 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1299 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001300 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1301 if base >= 19 and new_prio <= 19:
1302 raise unittest.SkipTest(
1303 "unable to reliably test setpriority at current nice level of %s" % base)
1304 else:
1305 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001306 finally:
1307 try:
1308 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1309 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001310 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001311 raise
1312
1313
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001314class SendfileTestServer(asyncore.dispatcher, threading.Thread):
1315
1316 class Handler(asynchat.async_chat):
1317
1318 def __init__(self, conn):
1319 asynchat.async_chat.__init__(self, conn)
1320 self.in_buffer = []
1321 self.closed = False
1322 self.push(b"220 ready\r\n")
1323
1324 def handle_read(self):
1325 data = self.recv(4096)
1326 self.in_buffer.append(data)
1327
1328 def get_data(self):
1329 return b''.join(self.in_buffer)
1330
1331 def handle_close(self):
1332 self.close()
1333 self.closed = True
1334
1335 def handle_error(self):
1336 raise
1337
1338 def __init__(self, address):
1339 threading.Thread.__init__(self)
1340 asyncore.dispatcher.__init__(self)
1341 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1342 self.bind(address)
1343 self.listen(5)
1344 self.host, self.port = self.socket.getsockname()[:2]
1345 self.handler_instance = None
1346 self._active = False
1347 self._active_lock = threading.Lock()
1348
1349 # --- public API
1350
1351 @property
1352 def running(self):
1353 return self._active
1354
1355 def start(self):
1356 assert not self.running
1357 self.__flag = threading.Event()
1358 threading.Thread.start(self)
1359 self.__flag.wait()
1360
1361 def stop(self):
1362 assert self.running
1363 self._active = False
1364 self.join()
1365
1366 def wait(self):
1367 # wait for handler connection to be closed, then stop the server
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001368 while not getattr(self.handler_instance, "closed", False):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001369 time.sleep(0.001)
1370 self.stop()
1371
1372 # --- internals
1373
1374 def run(self):
1375 self._active = True
1376 self.__flag.set()
1377 while self._active and asyncore.socket_map:
1378 self._active_lock.acquire()
1379 asyncore.loop(timeout=0.001, count=1)
1380 self._active_lock.release()
1381 asyncore.close_all()
1382
1383 def handle_accept(self):
1384 conn, addr = self.accept()
1385 self.handler_instance = self.Handler(conn)
1386
1387 def handle_connect(self):
1388 self.close()
1389 handle_read = handle_connect
1390
1391 def writable(self):
1392 return 0
1393
1394 def handle_error(self):
1395 raise
1396
1397
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001398@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001399@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1400class TestSendfile(unittest.TestCase):
1401
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001402 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001403 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001404 not sys.platform.startswith("solaris") and \
1405 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001406
1407 @classmethod
1408 def setUpClass(cls):
1409 with open(support.TESTFN, "wb") as f:
1410 f.write(cls.DATA)
1411
1412 @classmethod
1413 def tearDownClass(cls):
1414 support.unlink(support.TESTFN)
1415
1416 def setUp(self):
1417 self.server = SendfileTestServer((support.HOST, 0))
1418 self.server.start()
1419 self.client = socket.socket()
1420 self.client.connect((self.server.host, self.server.port))
1421 self.client.settimeout(1)
1422 # synchronize by waiting for "220 ready" response
1423 self.client.recv(1024)
1424 self.sockno = self.client.fileno()
1425 self.file = open(support.TESTFN, 'rb')
1426 self.fileno = self.file.fileno()
1427
1428 def tearDown(self):
1429 self.file.close()
1430 self.client.close()
1431 if self.server.running:
1432 self.server.stop()
1433
1434 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1435 """A higher level wrapper representing how an application is
1436 supposed to use sendfile().
1437 """
1438 while 1:
1439 try:
1440 if self.SUPPORT_HEADERS_TRAILERS:
1441 return os.sendfile(sock, file, offset, nbytes, headers,
1442 trailers)
1443 else:
1444 return os.sendfile(sock, file, offset, nbytes)
1445 except OSError as err:
1446 if err.errno == errno.ECONNRESET:
1447 # disconnected
1448 raise
1449 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1450 # we have to retry send data
1451 continue
1452 else:
1453 raise
1454
1455 def test_send_whole_file(self):
1456 # normal send
1457 total_sent = 0
1458 offset = 0
1459 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001460 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001461 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1462 if sent == 0:
1463 break
1464 offset += sent
1465 total_sent += sent
1466 self.assertTrue(sent <= nbytes)
1467 self.assertEqual(offset, total_sent)
1468
1469 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001470 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001471 self.client.close()
1472 self.server.wait()
1473 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001474 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001475 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001476
1477 def test_send_at_certain_offset(self):
1478 # start sending a file at a certain offset
1479 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001480 offset = len(self.DATA) // 2
1481 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001482 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001483 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001484 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1485 if sent == 0:
1486 break
1487 offset += sent
1488 total_sent += sent
1489 self.assertTrue(sent <= nbytes)
1490
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001491 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001492 self.client.close()
1493 self.server.wait()
1494 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001495 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001496 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001497 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001498 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001499
1500 def test_offset_overflow(self):
1501 # specify an offset > file size
1502 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001503 try:
1504 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1505 except OSError as e:
1506 # Solaris can raise EINVAL if offset >= file length, ignore.
1507 if e.errno != errno.EINVAL:
1508 raise
1509 else:
1510 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001511 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001512 self.client.close()
1513 self.server.wait()
1514 data = self.server.handler_instance.get_data()
1515 self.assertEqual(data, b'')
1516
1517 def test_invalid_offset(self):
1518 with self.assertRaises(OSError) as cm:
1519 os.sendfile(self.sockno, self.fileno, -1, 4096)
1520 self.assertEqual(cm.exception.errno, errno.EINVAL)
1521
1522 # --- headers / trailers tests
1523
1524 if SUPPORT_HEADERS_TRAILERS:
1525
1526 def test_headers(self):
1527 total_sent = 0
1528 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1529 headers=[b"x" * 512])
1530 total_sent += sent
1531 offset = 4096
1532 nbytes = 4096
1533 while 1:
1534 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1535 offset, nbytes)
1536 if sent == 0:
1537 break
1538 total_sent += sent
1539 offset += sent
1540
1541 expected_data = b"x" * 512 + self.DATA
1542 self.assertEqual(total_sent, len(expected_data))
1543 self.client.close()
1544 self.server.wait()
1545 data = self.server.handler_instance.get_data()
1546 self.assertEqual(hash(data), hash(expected_data))
1547
1548 def test_trailers(self):
1549 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001550 with open(TESTFN2, 'wb') as f:
1551 f.write(b"abcde")
1552 with open(TESTFN2, 'rb')as f:
1553 self.addCleanup(os.remove, TESTFN2)
1554 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1555 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001556 self.client.close()
1557 self.server.wait()
1558 data = self.server.handler_instance.get_data()
1559 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001560
1561 if hasattr(os, "SF_NODISKIO"):
1562 def test_flags(self):
1563 try:
1564 os.sendfile(self.sockno, self.fileno, 0, 4096,
1565 flags=os.SF_NODISKIO)
1566 except OSError as err:
1567 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1568 raise
1569
1570
Fred Drake2e2be372001-09-20 21:33:42 +00001571def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001572 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001573 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001574 StatAttributeTests,
1575 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001576 WalkTests,
1577 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001578 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001579 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001580 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001581 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001582 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001583 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001584 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001585 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001586 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001587 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001588 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001589 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001590 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001591 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001592 ProgramPriorityTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001593 )
Fred Drake2e2be372001-09-20 21:33:42 +00001594
1595if __name__ == "__main__":
1596 test_main()