blob: a2e2ee805cd7e5c138193bb9869d400099b20229 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
17import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000018import asyncore
19import asynchat
20import socket
21try:
22 import threading
23except ImportError:
24 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000025
Mark Dickinson7cf03892010-04-16 13:45:35 +000026# Detect whether we're on a Linux system that uses the (now outdated
27# and unmaintained) linuxthreads threading library. There's an issue
28# when combining linuxthreads with a failed execv call: see
29# http://bugs.python.org/issue4970.
Mark Dickinson89589c92010-04-16 13:51:27 +000030if (hasattr(os, "confstr_names") and
31 "CS_GNU_LIBPTHREAD_VERSION" in os.confstr_names):
Mark Dickinson7cf03892010-04-16 13:45:35 +000032 libpthread = os.confstr("CS_GNU_LIBPTHREAD_VERSION")
33 USING_LINUXTHREADS= libpthread.startswith("linuxthreads")
34else:
35 USING_LINUXTHREADS= False
Brian Curtineb24d742010-04-12 17:16:38 +000036
Thomas Wouters0e3f5912006-08-11 14:57:12 +000037# Tests creating TESTFN
38class FileTests(unittest.TestCase):
39 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040 if os.path.exists(support.TESTFN):
41 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000042 tearDown = setUp
43
44 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000045 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000047 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048
Christian Heimesfdab48e2008-01-20 09:06:41 +000049 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000050 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
51 # We must allocate two consecutive file descriptors, otherwise
52 # it will mess up other file descriptors (perhaps even the three
53 # standard ones).
54 second = os.dup(first)
55 try:
56 retries = 0
57 while second != first + 1:
58 os.close(first)
59 retries += 1
60 if retries > 10:
61 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000062 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first, second = second, os.dup(second)
64 finally:
65 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000066 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000067 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000068 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000070 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000071 def test_rename(self):
72 path = support.TESTFN
73 old = sys.getrefcount(path)
74 self.assertRaises(TypeError, os.rename, path, 0)
75 new = sys.getrefcount(path)
76 self.assertEqual(old, new)
77
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000078 def test_read(self):
79 with open(support.TESTFN, "w+b") as fobj:
80 fobj.write(b"spam")
81 fobj.flush()
82 fd = fobj.fileno()
83 os.lseek(fd, 0, 0)
84 s = os.read(fd, 4)
85 self.assertEqual(type(s), bytes)
86 self.assertEqual(s, b"spam")
87
88 def test_write(self):
89 # os.write() accepts bytes- and buffer-like objects but not strings
90 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
91 self.assertRaises(TypeError, os.write, fd, "beans")
92 os.write(fd, b"bacon\n")
93 os.write(fd, bytearray(b"eggs\n"))
94 os.write(fd, memoryview(b"spam\n"))
95 os.close(fd)
96 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000097 self.assertEqual(fobj.read().splitlines(),
98 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099
100
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000101class TemporaryFileTests(unittest.TestCase):
102 def setUp(self):
103 self.files = []
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000104 os.mkdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000105
106 def tearDown(self):
107 for name in self.files:
108 os.unlink(name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000109 os.rmdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000110
111 def check_tempfile(self, name):
112 # make sure it doesn't already exist:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000113 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000114 "file already exists for temporary file")
115 # make sure we can create the file
116 open(name, "w")
117 self.files.append(name)
118
119 def test_tempnam(self):
120 if not hasattr(os, "tempnam"):
121 return
122 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
123 r"test_os$")
124 self.check_tempfile(os.tempnam())
125
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000126 name = os.tempnam(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000127 self.check_tempfile(name)
128
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000129 name = os.tempnam(support.TESTFN, "pfx")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000130 self.assertTrue(os.path.basename(name)[:3] == "pfx")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000131 self.check_tempfile(name)
132
133 def test_tmpfile(self):
134 if not hasattr(os, "tmpfile"):
135 return
136 # As with test_tmpnam() below, the Windows implementation of tmpfile()
137 # attempts to create a file in the root directory of the current drive.
138 # On Vista and Server 2008, this test will always fail for normal users
139 # as writing to the root directory requires elevated privileges. With
140 # XP and below, the semantics of tmpfile() are the same, but the user
141 # running the test is more likely to have administrative privileges on
142 # their account already. If that's the case, then os.tmpfile() should
143 # work. In order to make this test as useful as possible, rather than
144 # trying to detect Windows versions or whether or not the user has the
145 # right permissions, just try and create a file in the root directory
146 # and see if it raises a 'Permission denied' OSError. If it does, then
147 # test that a subsequent call to os.tmpfile() raises the same error. If
148 # it doesn't, assume we're on XP or below and the user running the test
149 # has administrative privileges, and proceed with the test as normal.
150 if sys.platform == 'win32':
151 name = '\\python_test_os_test_tmpfile.txt'
152 if os.path.exists(name):
153 os.remove(name)
154 try:
155 fp = open(name, 'w')
156 except IOError as first:
157 # open() failed, assert tmpfile() fails in the same way.
158 # Although open() raises an IOError and os.tmpfile() raises an
159 # OSError(), 'args' will be (13, 'Permission denied') in both
160 # cases.
161 try:
162 fp = os.tmpfile()
163 except OSError as second:
164 self.assertEqual(first.args, second.args)
165 else:
166 self.fail("expected os.tmpfile() to raise OSError")
167 return
168 else:
169 # open() worked, therefore, tmpfile() should work. Close our
170 # dummy file and proceed with the test as normal.
171 fp.close()
172 os.remove(name)
173
174 fp = os.tmpfile()
175 fp.write("foobar")
176 fp.seek(0,0)
177 s = fp.read()
178 fp.close()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000179 self.assertTrue(s == "foobar")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000180
181 def test_tmpnam(self):
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000182 if not hasattr(os, "tmpnam"):
183 return
184 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
185 r"test_os$")
186 name = os.tmpnam()
187 if sys.platform in ("win32",):
188 # The Windows tmpnam() seems useless. From the MS docs:
189 #
190 # The character string that tmpnam creates consists of
191 # the path prefix, defined by the entry P_tmpdir in the
192 # file STDIO.H, followed by a sequence consisting of the
193 # digit characters '0' through '9'; the numerical value
194 # of this string is in the range 1 - 65,535. Changing the
195 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not
196 # change the operation of tmpnam.
197 #
198 # The really bizarre part is that, at least under MSVC6,
199 # P_tmpdir is "\\". That is, the path returned refers to
200 # the root of the current drive. That's a terrible place to
201 # put temp files, and, depending on privileges, the user
202 # may not even be able to open a file in the root directory.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000203 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000204 "file already exists for temporary file")
205 else:
206 self.check_tempfile(name)
207
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000208 def fdopen_helper(self, *args):
209 fd = os.open(support.TESTFN, os.O_RDONLY)
210 fp2 = os.fdopen(fd, *args)
211 fp2.close()
212
213 def test_fdopen(self):
214 self.fdopen_helper()
215 self.fdopen_helper('r')
216 self.fdopen_helper('r', 100)
217
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000218# Test attributes on return values from os.*stat* family.
219class StatAttributeTests(unittest.TestCase):
220 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000221 os.mkdir(support.TESTFN)
222 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000223 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000224 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000225 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000226
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000227 def tearDown(self):
228 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000230
Antoine Pitrou38425292010-09-21 18:19:07 +0000231 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000232 if not hasattr(os, "stat"):
233 return
234
235 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000236 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000237
238 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000239 self.assertEqual(result[stat.ST_SIZE], 3)
240 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 # Make sure all the attributes are there
243 members = dir(result)
244 for name in dir(stat):
245 if name[:3] == 'ST_':
246 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000247 if name.endswith("TIME"):
248 def trunc(x): return int(x)
249 else:
250 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000251 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000252 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000253 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254
255 try:
256 result[200]
257 self.fail("No exception thrown")
258 except IndexError:
259 pass
260
261 # Make sure that assignment fails
262 try:
263 result.st_mode = 1
264 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000265 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000266 pass
267
268 try:
269 result.st_rdev = 1
270 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000271 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 pass
273
274 try:
275 result.parrot = 1
276 self.fail("No exception thrown")
277 except AttributeError:
278 pass
279
280 # Use the stat_result constructor with a too-short tuple.
281 try:
282 result2 = os.stat_result((10,))
283 self.fail("No exception thrown")
284 except TypeError:
285 pass
286
287 # Use the constructr with a too-long tuple.
288 try:
289 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
290 except TypeError:
291 pass
292
Antoine Pitrou38425292010-09-21 18:19:07 +0000293 def test_stat_attributes(self):
294 self.check_stat_attributes(self.fname)
295
296 def test_stat_attributes_bytes(self):
297 try:
298 fname = self.fname.encode(sys.getfilesystemencoding())
299 except UnicodeEncodeError:
300 self.skipTest("cannot encode %a for the filesystem" % self.fname)
301 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000302
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000303 def test_statvfs_attributes(self):
304 if not hasattr(os, "statvfs"):
305 return
306
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000307 try:
308 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000309 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000310 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000311 if e.errno == errno.ENOSYS:
312 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000313
314 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000315 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000316
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000317 # Make sure all the attributes are there.
318 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
319 'ffree', 'favail', 'flag', 'namemax')
320 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000321 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000322
323 # Make sure that assignment really fails
324 try:
325 result.f_bfree = 1
326 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000327 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000328 pass
329
330 try:
331 result.parrot = 1
332 self.fail("No exception thrown")
333 except AttributeError:
334 pass
335
336 # Use the constructor with a too-short tuple.
337 try:
338 result2 = os.statvfs_result((10,))
339 self.fail("No exception thrown")
340 except TypeError:
341 pass
342
343 # Use the constructr with a too-long tuple.
344 try:
345 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
346 except TypeError:
347 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000348
Thomas Wouters89f507f2006-12-13 04:49:30 +0000349 def test_utime_dir(self):
350 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000351 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000352 # round to int, because some systems may support sub-second
353 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000354 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
355 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000356 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000357
358 # Restrict test to Win32, since there is no guarantee other
359 # systems support centiseconds
360 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000361 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000362 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000363 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000364 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000365 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000366 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000367 return buf.value
368
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000369 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000370 def test_1565150(self):
371 t1 = 1159195039.25
372 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000373 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000374
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000375 def test_large_time(self):
376 t1 = 5000000000 # some day in 2128
377 os.utime(self.fname, (t1, t1))
378 self.assertEqual(os.stat(self.fname).st_mtime, t1)
379
Guido van Rossumd8faa362007-04-27 19:54:29 +0000380 def test_1686475(self):
381 # Verify that an open file can be stat'ed
382 try:
383 os.stat(r"c:\pagefile.sys")
384 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000385 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000386 return
387 self.fail("Could not stat pagefile.sys")
388
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000389from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000390
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000391class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000392 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000393 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000394
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000395 def setUp(self):
396 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000397 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000398 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000399 for key, value in self._reference().items():
400 os.environ[key] = value
401
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000402 def tearDown(self):
403 os.environ.clear()
404 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000405 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000406 os.environb.clear()
407 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000408
Christian Heimes90333392007-11-01 19:08:42 +0000409 def _reference(self):
410 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
411
412 def _empty_mapping(self):
413 os.environ.clear()
414 return os.environ
415
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000416 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000417 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000418 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000419 if os.path.exists("/bin/sh"):
420 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000421 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
422 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000423 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000424
Christian Heimes1a13d592007-11-08 14:16:55 +0000425 def test_os_popen_iter(self):
426 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000427 with os.popen(
428 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
429 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000430 self.assertEqual(next(it), "line1\n")
431 self.assertEqual(next(it), "line2\n")
432 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000433 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000434
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000435 # Verify environ keys and values from the OS are of the
436 # correct str type.
437 def test_keyvalue_types(self):
438 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000439 self.assertEqual(type(key), str)
440 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000441
Christian Heimes90333392007-11-01 19:08:42 +0000442 def test_items(self):
443 for key, value in self._reference().items():
444 self.assertEqual(os.environ.get(key), value)
445
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000446 # Issue 7310
447 def test___repr__(self):
448 """Check that the repr() of os.environ looks like environ({...})."""
449 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000450 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
451 '{!r}: {!r}'.format(key, value)
452 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000453
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000454 def test_get_exec_path(self):
455 defpath_list = os.defpath.split(os.pathsep)
456 test_path = ['/monty', '/python', '', '/flying/circus']
457 test_env = {'PATH': os.pathsep.join(test_path)}
458
459 saved_environ = os.environ
460 try:
461 os.environ = dict(test_env)
462 # Test that defaulting to os.environ works.
463 self.assertSequenceEqual(test_path, os.get_exec_path())
464 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
465 finally:
466 os.environ = saved_environ
467
468 # No PATH environment variable
469 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
470 # Empty PATH environment variable
471 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
472 # Supplied PATH environment variable
473 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
474
Victor Stinnerb745a742010-05-18 17:17:23 +0000475 if os.supports_bytes_environ:
476 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000477 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000478 # ignore BytesWarning warning
479 with warnings.catch_warnings(record=True):
480 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000481 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000482 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000483 pass
484 else:
485 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000486
487 # bytes key and/or value
488 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
489 ['abc'])
490 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
491 ['abc'])
492 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
493 ['abc'])
494
495 @unittest.skipUnless(os.supports_bytes_environ,
496 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000497 def test_environb(self):
498 # os.environ -> os.environb
499 value = 'euro\u20ac'
500 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000501 value_bytes = value.encode(sys.getfilesystemencoding(),
502 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000503 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000504 msg = "U+20AC character is not encodable to %s" % (
505 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000506 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000507 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000508 self.assertEqual(os.environ['unicode'], value)
509 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000510
511 # os.environb -> os.environ
512 value = b'\xff'
513 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000514 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000515 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000516 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000517
Tim Petersc4e09402003-04-25 07:11:48 +0000518class WalkTests(unittest.TestCase):
519 """Tests for os.walk()."""
520
521 def test_traversal(self):
522 import os
523 from os.path import join
524
525 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000526 # TESTFN/
527 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000528 # tmp1
529 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000530 # tmp2
531 # SUB11/ no kids
532 # SUB2/ a file kid and a dirsymlink kid
533 # tmp3
534 # link/ a symlink to TESTFN.2
535 # TEST2/
536 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000537 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000538 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000539 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000540 sub2_path = join(walk_path, "SUB2")
541 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000542 tmp2_path = join(sub1_path, "tmp2")
543 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000544 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000545 t2_path = join(support.TESTFN, "TEST2")
546 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000547
548 # Create stuff.
549 os.makedirs(sub11_path)
550 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000551 os.makedirs(t2_path)
552 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000553 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000554 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
555 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000556 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000557 os.symlink(os.path.abspath(t2_path), link_path)
558 sub2_tree = (sub2_path, ["link"], ["tmp3"])
559 else:
560 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000561
562 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000563 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000564 self.assertEqual(len(all), 4)
565 # We can't know which order SUB1 and SUB2 will appear in.
566 # Not flipped: TESTFN, SUB1, SUB11, SUB2
567 # flipped: TESTFN, SUB2, SUB1, SUB11
568 flipped = all[0][1][0] != "SUB1"
569 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000570 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000571 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
572 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000573 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000574
575 # Prune the search.
576 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000577 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000578 all.append((root, dirs, files))
579 # Don't descend into SUB1.
580 if 'SUB1' in dirs:
581 # Note that this also mutates the dirs we appended to all!
582 dirs.remove('SUB1')
583 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000584 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
585 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000586
587 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000588 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000589 self.assertEqual(len(all), 4)
590 # We can't know which order SUB1 and SUB2 will appear in.
591 # Not flipped: SUB11, SUB1, SUB2, TESTFN
592 # flipped: SUB2, SUB11, SUB1, TESTFN
593 flipped = all[3][1][0] != "SUB1"
594 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000595 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000596 self.assertEqual(all[flipped], (sub11_path, [], []))
597 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000598 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000599
Brian Curtin3b4499c2010-12-28 14:31:47 +0000600 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000601 # Walk, following symlinks.
602 for root, dirs, files in os.walk(walk_path, followlinks=True):
603 if root == link_path:
604 self.assertEqual(dirs, [])
605 self.assertEqual(files, ["tmp4"])
606 break
607 else:
608 self.fail("Didn't follow symlink with followlinks=True")
609
610 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000611 # Tear everything down. This is a decent use for bottom-up on
612 # Windows, which doesn't have a recursive delete command. The
613 # (not so) subtlety is that rmdir will fail unless the dir's
614 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000615 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000616 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000617 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000618 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000619 dirname = os.path.join(root, name)
620 if not os.path.islink(dirname):
621 os.rmdir(dirname)
622 else:
623 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000624 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000625
Guido van Rossume7ba4952007-06-06 23:52:48 +0000626class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000627 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000628 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000629
630 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000631 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000632 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
633 os.makedirs(path) # Should work
634 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
635 os.makedirs(path)
636
637 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000638 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000639 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
640 os.makedirs(path)
641 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
642 'dir5', 'dir6')
643 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000644
Terry Reedy5a22b652010-12-02 07:05:56 +0000645 def test_exist_ok_existing_directory(self):
646 path = os.path.join(support.TESTFN, 'dir1')
647 mode = 0o777
648 old_mask = os.umask(0o022)
649 os.makedirs(path, mode)
650 self.assertRaises(OSError, os.makedirs, path, mode)
651 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
652 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
653 os.makedirs(path, mode=mode, exist_ok=True)
654 os.umask(old_mask)
655
656 def test_exist_ok_existing_regular_file(self):
657 base = support.TESTFN
658 path = os.path.join(support.TESTFN, 'dir1')
659 f = open(path, 'w')
660 f.write('abc')
661 f.close()
662 self.assertRaises(OSError, os.makedirs, path)
663 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
664 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
665 os.remove(path)
666
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000667 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000668 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000669 'dir4', 'dir5', 'dir6')
670 # If the tests failed, the bottom-most directory ('../dir6')
671 # may not have been created, so we look for the outermost directory
672 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000673 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000674 path = os.path.dirname(path)
675
676 os.removedirs(path)
677
Guido van Rossume7ba4952007-06-06 23:52:48 +0000678class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000679 def test_devnull(self):
Alex Martelli01c77c62006-08-24 02:58:11 +0000680 f = open(os.devnull, 'w')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000681 f.write('hello')
682 f.close()
Alex Martelli01c77c62006-08-24 02:58:11 +0000683 f = open(os.devnull, 'r')
Tim Peters4182cfd2004-06-08 20:34:34 +0000684 self.assertEqual(f.read(), '')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000685 f.close()
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000686
Guido van Rossume7ba4952007-06-06 23:52:48 +0000687class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000688 def test_urandom(self):
689 try:
690 self.assertEqual(len(os.urandom(1)), 1)
691 self.assertEqual(len(os.urandom(10)), 10)
692 self.assertEqual(len(os.urandom(100)), 100)
693 self.assertEqual(len(os.urandom(1000)), 1000)
694 except NotImplementedError:
695 pass
696
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000697@contextlib.contextmanager
698def _execvpe_mockup(defpath=None):
699 """
700 Stubs out execv and execve functions when used as context manager.
701 Records exec calls. The mock execv and execve functions always raise an
702 exception as they would normally never return.
703 """
704 # A list of tuples containing (function name, first arg, args)
705 # of calls to execv or execve that have been made.
706 calls = []
707
708 def mock_execv(name, *args):
709 calls.append(('execv', name, args))
710 raise RuntimeError("execv called")
711
712 def mock_execve(name, *args):
713 calls.append(('execve', name, args))
714 raise OSError(errno.ENOTDIR, "execve called")
715
716 try:
717 orig_execv = os.execv
718 orig_execve = os.execve
719 orig_defpath = os.defpath
720 os.execv = mock_execv
721 os.execve = mock_execve
722 if defpath is not None:
723 os.defpath = defpath
724 yield calls
725 finally:
726 os.execv = orig_execv
727 os.execve = orig_execve
728 os.defpath = orig_defpath
729
Guido van Rossume7ba4952007-06-06 23:52:48 +0000730class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000731 @unittest.skipIf(USING_LINUXTHREADS,
732 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000733 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000734 self.assertRaises(OSError, os.execvpe, 'no such app-',
735 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000736
Thomas Heller6790d602007-08-30 17:15:14 +0000737 def test_execvpe_with_bad_arglist(self):
738 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
739
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000740 @unittest.skipUnless(hasattr(os, '_execvpe'),
741 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000742 def _test_internal_execvpe(self, test_type):
743 program_path = os.sep + 'absolutepath'
744 if test_type is bytes:
745 program = b'executable'
746 fullpath = os.path.join(os.fsencode(program_path), program)
747 native_fullpath = fullpath
748 arguments = [b'progname', 'arg1', 'arg2']
749 else:
750 program = 'executable'
751 arguments = ['progname', 'arg1', 'arg2']
752 fullpath = os.path.join(program_path, program)
753 if os.name != "nt":
754 native_fullpath = os.fsencode(fullpath)
755 else:
756 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000757 env = {'spam': 'beans'}
758
Victor Stinnerb745a742010-05-18 17:17:23 +0000759 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000760 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000761 self.assertRaises(RuntimeError,
762 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000763 self.assertEqual(len(calls), 1)
764 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
765
Victor Stinnerb745a742010-05-18 17:17:23 +0000766 # test os._execvpe() with a relative path:
767 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000768 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000769 self.assertRaises(OSError,
770 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000771 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000772 self.assertSequenceEqual(calls[0],
773 ('execve', native_fullpath, (arguments, env)))
774
775 # test os._execvpe() with a relative path:
776 # os.get_exec_path() reads the 'PATH' variable
777 with _execvpe_mockup() as calls:
778 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000779 if test_type is bytes:
780 env_path[b'PATH'] = program_path
781 else:
782 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000783 self.assertRaises(OSError,
784 os._execvpe, program, arguments, env=env_path)
785 self.assertEqual(len(calls), 1)
786 self.assertSequenceEqual(calls[0],
787 ('execve', native_fullpath, (arguments, env_path)))
788
789 def test_internal_execvpe_str(self):
790 self._test_internal_execvpe(str)
791 if os.name != "nt":
792 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000793
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000794
Thomas Wouters477c8d52006-05-27 19:21:47 +0000795class Win32ErrorTests(unittest.TestCase):
796 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000797 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000798
799 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000800 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000801
802 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000803 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000804
805 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000806 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000807 try:
808 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
809 finally:
810 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000811 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000812
813 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000814 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000815
Thomas Wouters477c8d52006-05-27 19:21:47 +0000816 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000817 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000818
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000819class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000820 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000821 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
822 #singles.append("close")
823 #We omit close because it doesn'r raise an exception on some platforms
824 def get_single(f):
825 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000826 if hasattr(os, f):
827 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000828 return helper
829 for f in singles:
830 locals()["test_"+f] = get_single(f)
831
Benjamin Peterson7522c742009-01-19 21:00:09 +0000832 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000833 try:
834 f(support.make_bad_fd(), *args)
835 except OSError as e:
836 self.assertEqual(e.errno, errno.EBADF)
837 else:
838 self.fail("%r didn't raise a OSError with a bad file descriptor"
839 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000840
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000841 def test_isatty(self):
842 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000843 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000844
845 def test_closerange(self):
846 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000847 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000848 # Make sure none of the descriptors we are about to close are
849 # currently valid (issue 6542).
850 for i in range(10):
851 try: os.fstat(fd+i)
852 except OSError:
853 pass
854 else:
855 break
856 if i < 2:
857 raise unittest.SkipTest(
858 "Unable to acquire a range of invalid file descriptors")
859 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000860
861 def test_dup2(self):
862 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000863 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000864
865 def test_fchmod(self):
866 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000867 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000868
869 def test_fchown(self):
870 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000871 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000872
873 def test_fpathconf(self):
874 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000875 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000876
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000877 def test_ftruncate(self):
878 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000879 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000880
881 def test_lseek(self):
882 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000883 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000884
885 def test_read(self):
886 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000887 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000888
889 def test_tcsetpgrpt(self):
890 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000891 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000892
893 def test_write(self):
894 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000895 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000896
Brian Curtin1b9df392010-11-24 20:24:31 +0000897
898class LinkTests(unittest.TestCase):
899 def setUp(self):
900 self.file1 = support.TESTFN
901 self.file2 = os.path.join(support.TESTFN + "2")
902
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000903 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000904 for file in (self.file1, self.file2):
905 if os.path.exists(file):
906 os.unlink(file)
907
Brian Curtin1b9df392010-11-24 20:24:31 +0000908 def _test_link(self, file1, file2):
909 with open(file1, "w") as f1:
910 f1.write("test")
911
912 os.link(file1, file2)
913 with open(file1, "r") as f1, open(file2, "r") as f2:
914 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
915
916 def test_link(self):
917 self._test_link(self.file1, self.file2)
918
919 def test_link_bytes(self):
920 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
921 bytes(self.file2, sys.getfilesystemencoding()))
922
Brian Curtinf498b752010-11-30 15:54:04 +0000923 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000924 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000925 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000926 except UnicodeError:
927 raise unittest.SkipTest("Unable to encode for this platform.")
928
Brian Curtinf498b752010-11-30 15:54:04 +0000929 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000930 self.file2 = self.file1 + "2"
931 self._test_link(self.file1, self.file2)
932
Thomas Wouters477c8d52006-05-27 19:21:47 +0000933if sys.platform != 'win32':
934 class Win32ErrorTests(unittest.TestCase):
935 pass
936
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000937 class PosixUidGidTests(unittest.TestCase):
938 if hasattr(os, 'setuid'):
939 def test_setuid(self):
940 if os.getuid() != 0:
941 self.assertRaises(os.error, os.setuid, 0)
942 self.assertRaises(OverflowError, os.setuid, 1<<32)
943
944 if hasattr(os, 'setgid'):
945 def test_setgid(self):
946 if os.getuid() != 0:
947 self.assertRaises(os.error, os.setgid, 0)
948 self.assertRaises(OverflowError, os.setgid, 1<<32)
949
950 if hasattr(os, 'seteuid'):
951 def test_seteuid(self):
952 if os.getuid() != 0:
953 self.assertRaises(os.error, os.seteuid, 0)
954 self.assertRaises(OverflowError, os.seteuid, 1<<32)
955
956 if hasattr(os, 'setegid'):
957 def test_setegid(self):
958 if os.getuid() != 0:
959 self.assertRaises(os.error, os.setegid, 0)
960 self.assertRaises(OverflowError, os.setegid, 1<<32)
961
962 if hasattr(os, 'setreuid'):
963 def test_setreuid(self):
964 if os.getuid() != 0:
965 self.assertRaises(os.error, os.setreuid, 0, 0)
966 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
967 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000968
969 def test_setreuid_neg1(self):
970 # Needs to accept -1. We run this in a subprocess to avoid
971 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000972 subprocess.check_call([
973 sys.executable, '-c',
974 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000975
976 if hasattr(os, 'setregid'):
977 def test_setregid(self):
978 if os.getuid() != 0:
979 self.assertRaises(os.error, os.setregid, 0, 0)
980 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
981 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000982
983 def test_setregid_neg1(self):
984 # Needs to accept -1. We run this in a subprocess to avoid
985 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000986 subprocess.check_call([
987 sys.executable, '-c',
988 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +0000989
990 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +0000991 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000992 if support.TESTFN_UNENCODABLE:
993 self.dir = support.TESTFN_UNENCODABLE
994 else:
995 self.dir = support.TESTFN
996 self.bdir = os.fsencode(self.dir)
997
998 bytesfn = []
999 def add_filename(fn):
1000 try:
1001 fn = os.fsencode(fn)
1002 except UnicodeEncodeError:
1003 return
1004 bytesfn.append(fn)
1005 add_filename(support.TESTFN_UNICODE)
1006 if support.TESTFN_UNENCODABLE:
1007 add_filename(support.TESTFN_UNENCODABLE)
1008 if not bytesfn:
1009 self.skipTest("couldn't create any non-ascii filename")
1010
1011 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001012 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001013 try:
1014 for fn in bytesfn:
1015 f = open(os.path.join(self.bdir, fn), "w")
1016 f.close()
Victor Stinnere8d51452010-08-19 01:05:19 +00001017 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001018 if fn in self.unicodefn:
1019 raise ValueError("duplicate filename")
1020 self.unicodefn.add(fn)
1021 except:
1022 shutil.rmtree(self.dir)
1023 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001024
1025 def tearDown(self):
1026 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001027
1028 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001029 expected = self.unicodefn
1030 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001031 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001032
1033 def test_open(self):
1034 for fn in self.unicodefn:
1035 f = open(os.path.join(self.dir, fn))
1036 f.close()
1037
1038 def test_stat(self):
1039 for fn in self.unicodefn:
1040 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001041else:
1042 class PosixUidGidTests(unittest.TestCase):
1043 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001044 class Pep383Tests(unittest.TestCase):
1045 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001046
Brian Curtineb24d742010-04-12 17:16:38 +00001047@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1048class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001049 def _kill(self, sig):
1050 # Start sys.executable as a subprocess and communicate from the
1051 # subprocess to the parent that the interpreter is ready. When it
1052 # becomes ready, send *sig* via os.kill to the subprocess and check
1053 # that the return code is equal to *sig*.
1054 import ctypes
1055 from ctypes import wintypes
1056 import msvcrt
1057
1058 # Since we can't access the contents of the process' stdout until the
1059 # process has exited, use PeekNamedPipe to see what's inside stdout
1060 # without waiting. This is done so we can tell that the interpreter
1061 # is started and running at a point where it could handle a signal.
1062 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1063 PeekNamedPipe.restype = wintypes.BOOL
1064 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1065 ctypes.POINTER(ctypes.c_char), # stdout buf
1066 wintypes.DWORD, # Buffer size
1067 ctypes.POINTER(wintypes.DWORD), # bytes read
1068 ctypes.POINTER(wintypes.DWORD), # bytes avail
1069 ctypes.POINTER(wintypes.DWORD)) # bytes left
1070 msg = "running"
1071 proc = subprocess.Popen([sys.executable, "-c",
1072 "import sys;"
1073 "sys.stdout.write('{}');"
1074 "sys.stdout.flush();"
1075 "input()".format(msg)],
1076 stdout=subprocess.PIPE,
1077 stderr=subprocess.PIPE,
1078 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001079 self.addCleanup(proc.stdout.close)
1080 self.addCleanup(proc.stderr.close)
1081 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001082
1083 count, max = 0, 100
1084 while count < max and proc.poll() is None:
1085 # Create a string buffer to store the result of stdout from the pipe
1086 buf = ctypes.create_string_buffer(len(msg))
1087 # Obtain the text currently in proc.stdout
1088 # Bytes read/avail/left are left as NULL and unused
1089 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1090 buf, ctypes.sizeof(buf), None, None, None)
1091 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1092 if buf.value:
1093 self.assertEqual(msg, buf.value.decode())
1094 break
1095 time.sleep(0.1)
1096 count += 1
1097 else:
1098 self.fail("Did not receive communication from the subprocess")
1099
Brian Curtineb24d742010-04-12 17:16:38 +00001100 os.kill(proc.pid, sig)
1101 self.assertEqual(proc.wait(), sig)
1102
1103 def test_kill_sigterm(self):
1104 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001105 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001106
1107 def test_kill_int(self):
1108 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001109 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001110
1111 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001112 tagname = "test_os_%s" % uuid.uuid1()
1113 m = mmap.mmap(-1, 1, tagname)
1114 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001115 # Run a script which has console control handling enabled.
1116 proc = subprocess.Popen([sys.executable,
1117 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001118 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001119 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1120 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001121 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001122 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001123 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001124 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001125 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001126 count += 1
1127 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001128 # Forcefully kill the process if we weren't able to signal it.
1129 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001130 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001131 os.kill(proc.pid, event)
1132 # proc.send_signal(event) could also be done here.
1133 # Allow time for the signal to be passed and the process to exit.
1134 time.sleep(0.5)
1135 if not proc.poll():
1136 # Forcefully kill the process if we weren't able to signal it.
1137 os.kill(proc.pid, signal.SIGINT)
1138 self.fail("subprocess did not stop on {}".format(name))
1139
1140 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1141 def test_CTRL_C_EVENT(self):
1142 from ctypes import wintypes
1143 import ctypes
1144
1145 # Make a NULL value by creating a pointer with no argument.
1146 NULL = ctypes.POINTER(ctypes.c_int)()
1147 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1148 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1149 wintypes.BOOL)
1150 SetConsoleCtrlHandler.restype = wintypes.BOOL
1151
1152 # Calling this with NULL and FALSE causes the calling process to
1153 # handle CTRL+C, rather than ignore it. This property is inherited
1154 # by subprocesses.
1155 SetConsoleCtrlHandler(NULL, 0)
1156
1157 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1158
1159 def test_CTRL_BREAK_EVENT(self):
1160 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1161
1162
Brian Curtind40e6f72010-07-08 21:39:08 +00001163@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001164@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001165class Win32SymlinkTests(unittest.TestCase):
1166 filelink = 'filelinktest'
1167 filelink_target = os.path.abspath(__file__)
1168 dirlink = 'dirlinktest'
1169 dirlink_target = os.path.dirname(filelink_target)
1170 missing_link = 'missing link'
1171
1172 def setUp(self):
1173 assert os.path.exists(self.dirlink_target)
1174 assert os.path.exists(self.filelink_target)
1175 assert not os.path.exists(self.dirlink)
1176 assert not os.path.exists(self.filelink)
1177 assert not os.path.exists(self.missing_link)
1178
1179 def tearDown(self):
1180 if os.path.exists(self.filelink):
1181 os.remove(self.filelink)
1182 if os.path.exists(self.dirlink):
1183 os.rmdir(self.dirlink)
1184 if os.path.lexists(self.missing_link):
1185 os.remove(self.missing_link)
1186
1187 def test_directory_link(self):
1188 os.symlink(self.dirlink_target, self.dirlink)
1189 self.assertTrue(os.path.exists(self.dirlink))
1190 self.assertTrue(os.path.isdir(self.dirlink))
1191 self.assertTrue(os.path.islink(self.dirlink))
1192 self.check_stat(self.dirlink, self.dirlink_target)
1193
1194 def test_file_link(self):
1195 os.symlink(self.filelink_target, self.filelink)
1196 self.assertTrue(os.path.exists(self.filelink))
1197 self.assertTrue(os.path.isfile(self.filelink))
1198 self.assertTrue(os.path.islink(self.filelink))
1199 self.check_stat(self.filelink, self.filelink_target)
1200
1201 def _create_missing_dir_link(self):
1202 'Create a "directory" link to a non-existent target'
1203 linkname = self.missing_link
1204 if os.path.lexists(linkname):
1205 os.remove(linkname)
1206 target = r'c:\\target does not exist.29r3c740'
1207 assert not os.path.exists(target)
1208 target_is_dir = True
1209 os.symlink(target, linkname, target_is_dir)
1210
1211 def test_remove_directory_link_to_missing_target(self):
1212 self._create_missing_dir_link()
1213 # For compatibility with Unix, os.remove will check the
1214 # directory status and call RemoveDirectory if the symlink
1215 # was created with target_is_dir==True.
1216 os.remove(self.missing_link)
1217
1218 @unittest.skip("currently fails; consider for improvement")
1219 def test_isdir_on_directory_link_to_missing_target(self):
1220 self._create_missing_dir_link()
1221 # consider having isdir return true for directory links
1222 self.assertTrue(os.path.isdir(self.missing_link))
1223
1224 @unittest.skip("currently fails; consider for improvement")
1225 def test_rmdir_on_directory_link_to_missing_target(self):
1226 self._create_missing_dir_link()
1227 # consider allowing rmdir to remove directory links
1228 os.rmdir(self.missing_link)
1229
1230 def check_stat(self, link, target):
1231 self.assertEqual(os.stat(link), os.stat(target))
1232 self.assertNotEqual(os.lstat(link), os.stat(link))
1233
1234
Victor Stinnere8d51452010-08-19 01:05:19 +00001235class FSEncodingTests(unittest.TestCase):
1236 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001237 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1238 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001239
Victor Stinnere8d51452010-08-19 01:05:19 +00001240 def test_identity(self):
1241 # assert fsdecode(fsencode(x)) == x
1242 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1243 try:
1244 bytesfn = os.fsencode(fn)
1245 except UnicodeEncodeError:
1246 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001247 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001248
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001249
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001250class PidTests(unittest.TestCase):
1251 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1252 def test_getppid(self):
1253 p = subprocess.Popen([sys.executable, '-c',
1254 'import os; print(os.getppid())'],
1255 stdout=subprocess.PIPE)
1256 stdout, _ = p.communicate()
1257 # We are the parent of our subprocess
1258 self.assertEqual(int(stdout), os.getpid())
1259
1260
Brian Curtin0151b8e2010-09-24 13:43:43 +00001261# The introduction of this TestCase caused at least two different errors on
1262# *nix buildbots. Temporarily skip this to let the buildbots move along.
1263@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001264@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1265class LoginTests(unittest.TestCase):
1266 def test_getlogin(self):
1267 user_name = os.getlogin()
1268 self.assertNotEqual(len(user_name), 0)
1269
1270
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001271@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1272 "needs os.getpriority and os.setpriority")
1273class ProgramPriorityTests(unittest.TestCase):
1274 """Tests for os.getpriority() and os.setpriority()."""
1275
1276 def test_set_get_priority(self):
1277 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1278 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1279 try:
1280 self.assertEqual(os.getpriority(os.PRIO_PROCESS, os.getpid()), base + 1)
1281 finally:
1282 try:
1283 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1284 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001285 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001286 raise
1287
1288
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001289class SendfileTestServer(asyncore.dispatcher, threading.Thread):
1290
1291 class Handler(asynchat.async_chat):
1292
1293 def __init__(self, conn):
1294 asynchat.async_chat.__init__(self, conn)
1295 self.in_buffer = []
1296 self.closed = False
1297 self.push(b"220 ready\r\n")
1298
1299 def handle_read(self):
1300 data = self.recv(4096)
1301 self.in_buffer.append(data)
1302
1303 def get_data(self):
1304 return b''.join(self.in_buffer)
1305
1306 def handle_close(self):
1307 self.close()
1308 self.closed = True
1309
1310 def handle_error(self):
1311 raise
1312
1313 def __init__(self, address):
1314 threading.Thread.__init__(self)
1315 asyncore.dispatcher.__init__(self)
1316 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1317 self.bind(address)
1318 self.listen(5)
1319 self.host, self.port = self.socket.getsockname()[:2]
1320 self.handler_instance = None
1321 self._active = False
1322 self._active_lock = threading.Lock()
1323
1324 # --- public API
1325
1326 @property
1327 def running(self):
1328 return self._active
1329
1330 def start(self):
1331 assert not self.running
1332 self.__flag = threading.Event()
1333 threading.Thread.start(self)
1334 self.__flag.wait()
1335
1336 def stop(self):
1337 assert self.running
1338 self._active = False
1339 self.join()
1340
1341 def wait(self):
1342 # wait for handler connection to be closed, then stop the server
1343 while not getattr(self.handler_instance, "closed", True):
1344 time.sleep(0.001)
1345 self.stop()
1346
1347 # --- internals
1348
1349 def run(self):
1350 self._active = True
1351 self.__flag.set()
1352 while self._active and asyncore.socket_map:
1353 self._active_lock.acquire()
1354 asyncore.loop(timeout=0.001, count=1)
1355 self._active_lock.release()
1356 asyncore.close_all()
1357
1358 def handle_accept(self):
1359 conn, addr = self.accept()
1360 self.handler_instance = self.Handler(conn)
1361
1362 def handle_connect(self):
1363 self.close()
1364 handle_read = handle_connect
1365
1366 def writable(self):
1367 return 0
1368
1369 def handle_error(self):
1370 raise
1371
1372
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001373@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001374@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1375class TestSendfile(unittest.TestCase):
1376
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001377 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001378 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001379 not sys.platform.startswith("solaris") and \
1380 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001381
1382 @classmethod
1383 def setUpClass(cls):
1384 with open(support.TESTFN, "wb") as f:
1385 f.write(cls.DATA)
1386
1387 @classmethod
1388 def tearDownClass(cls):
1389 support.unlink(support.TESTFN)
1390
1391 def setUp(self):
1392 self.server = SendfileTestServer((support.HOST, 0))
1393 self.server.start()
1394 self.client = socket.socket()
1395 self.client.connect((self.server.host, self.server.port))
1396 self.client.settimeout(1)
1397 # synchronize by waiting for "220 ready" response
1398 self.client.recv(1024)
1399 self.sockno = self.client.fileno()
1400 self.file = open(support.TESTFN, 'rb')
1401 self.fileno = self.file.fileno()
1402
1403 def tearDown(self):
1404 self.file.close()
1405 self.client.close()
1406 if self.server.running:
1407 self.server.stop()
1408
1409 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1410 """A higher level wrapper representing how an application is
1411 supposed to use sendfile().
1412 """
1413 while 1:
1414 try:
1415 if self.SUPPORT_HEADERS_TRAILERS:
1416 return os.sendfile(sock, file, offset, nbytes, headers,
1417 trailers)
1418 else:
1419 return os.sendfile(sock, file, offset, nbytes)
1420 except OSError as err:
1421 if err.errno == errno.ECONNRESET:
1422 # disconnected
1423 raise
1424 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1425 # we have to retry send data
1426 continue
1427 else:
1428 raise
1429
1430 def test_send_whole_file(self):
1431 # normal send
1432 total_sent = 0
1433 offset = 0
1434 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001435 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001436 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1437 if sent == 0:
1438 break
1439 offset += sent
1440 total_sent += sent
1441 self.assertTrue(sent <= nbytes)
1442 self.assertEqual(offset, total_sent)
1443
1444 self.assertEqual(total_sent, len(self.DATA))
1445 self.client.close()
1446 self.server.wait()
1447 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001448 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001449
1450 def test_send_at_certain_offset(self):
1451 # start sending a file at a certain offset
1452 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001453 offset = len(self.DATA) // 2
1454 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001455 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001456 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001457 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1458 if sent == 0:
1459 break
1460 offset += sent
1461 total_sent += sent
1462 self.assertTrue(sent <= nbytes)
1463
1464 self.client.close()
1465 self.server.wait()
1466 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001467 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001468 self.assertEqual(total_sent, len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001469 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001470
1471 def test_offset_overflow(self):
1472 # specify an offset > file size
1473 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001474 try:
1475 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1476 except OSError as e:
1477 # Solaris can raise EINVAL if offset >= file length, ignore.
1478 if e.errno != errno.EINVAL:
1479 raise
1480 else:
1481 self.assertEqual(sent, 0)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001482 self.client.close()
1483 self.server.wait()
1484 data = self.server.handler_instance.get_data()
1485 self.assertEqual(data, b'')
1486
1487 def test_invalid_offset(self):
1488 with self.assertRaises(OSError) as cm:
1489 os.sendfile(self.sockno, self.fileno, -1, 4096)
1490 self.assertEqual(cm.exception.errno, errno.EINVAL)
1491
1492 # --- headers / trailers tests
1493
1494 if SUPPORT_HEADERS_TRAILERS:
1495
1496 def test_headers(self):
1497 total_sent = 0
1498 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1499 headers=[b"x" * 512])
1500 total_sent += sent
1501 offset = 4096
1502 nbytes = 4096
1503 while 1:
1504 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1505 offset, nbytes)
1506 if sent == 0:
1507 break
1508 total_sent += sent
1509 offset += sent
1510
1511 expected_data = b"x" * 512 + self.DATA
1512 self.assertEqual(total_sent, len(expected_data))
1513 self.client.close()
1514 self.server.wait()
1515 data = self.server.handler_instance.get_data()
1516 self.assertEqual(hash(data), hash(expected_data))
1517
1518 def test_trailers(self):
1519 TESTFN2 = support.TESTFN + "2"
1520 f = open(TESTFN2, 'wb')
1521 f.write(b"abcde")
1522 f.close()
1523 f = open(TESTFN2, 'rb')
1524 try:
1525 os.sendfile(self.sockno, f.fileno(), 0, 4096, trailers=[b"12345"])
1526 self.client.close()
1527 self.server.wait()
1528 data = self.server.handler_instance.get_data()
1529 self.assertEqual(data, b"abcde12345")
1530 finally:
1531 os.remove(TESTFN2)
1532
1533 if hasattr(os, "SF_NODISKIO"):
1534 def test_flags(self):
1535 try:
1536 os.sendfile(self.sockno, self.fileno, 0, 4096,
1537 flags=os.SF_NODISKIO)
1538 except OSError as err:
1539 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1540 raise
1541
1542
Fred Drake2e2be372001-09-20 21:33:42 +00001543def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001544 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001545 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001546 StatAttributeTests,
1547 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001548 WalkTests,
1549 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001550 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001551 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001552 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001553 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001554 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001555 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001556 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001557 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001558 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001559 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001560 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001561 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001562 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001563 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001564 ProgramPriorityTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001565 )
Fred Drake2e2be372001-09-20 21:33:42 +00001566
1567if __name__ == "__main__":
1568 test_main()