blob: 543241294c669b94856e41626e2e6f04da466866 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
17import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000018import asyncore
19import asynchat
20import socket
21try:
22 import threading
23except ImportError:
24 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000025
Mark Dickinson7cf03892010-04-16 13:45:35 +000026# Detect whether we're on a Linux system that uses the (now outdated
27# and unmaintained) linuxthreads threading library. There's an issue
28# when combining linuxthreads with a failed execv call: see
29# http://bugs.python.org/issue4970.
Victor Stinner754851f2011-04-19 23:58:51 +020030USING_LINUXTHREADS = False
31if threading:
32 info = threading._info()
33 try:
34 pthread_version = info['pthread_version']
35 except KeyError:
36 pass
37 else:
38 USING_LINUXTHREADS = pthread_version.startswith("linuxthreads")
Brian Curtineb24d742010-04-12 17:16:38 +000039
Thomas Wouters0e3f5912006-08-11 14:57:12 +000040# Tests creating TESTFN
41class FileTests(unittest.TestCase):
42 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000043 if os.path.exists(support.TESTFN):
44 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000045 tearDown = setUp
46
47 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000048 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000049 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000050 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000051
Christian Heimesfdab48e2008-01-20 09:06:41 +000052 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000053 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
54 # We must allocate two consecutive file descriptors, otherwise
55 # it will mess up other file descriptors (perhaps even the three
56 # standard ones).
57 second = os.dup(first)
58 try:
59 retries = 0
60 while second != first + 1:
61 os.close(first)
62 retries += 1
63 if retries > 10:
64 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000065 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000066 first, second = second, os.dup(second)
67 finally:
68 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000069 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000070 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000071 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000072
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000073 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000074 def test_rename(self):
75 path = support.TESTFN
76 old = sys.getrefcount(path)
77 self.assertRaises(TypeError, os.rename, path, 0)
78 new = sys.getrefcount(path)
79 self.assertEqual(old, new)
80
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000081 def test_read(self):
82 with open(support.TESTFN, "w+b") as fobj:
83 fobj.write(b"spam")
84 fobj.flush()
85 fd = fobj.fileno()
86 os.lseek(fd, 0, 0)
87 s = os.read(fd, 4)
88 self.assertEqual(type(s), bytes)
89 self.assertEqual(s, b"spam")
90
91 def test_write(self):
92 # os.write() accepts bytes- and buffer-like objects but not strings
93 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
94 self.assertRaises(TypeError, os.write, fd, "beans")
95 os.write(fd, b"bacon\n")
96 os.write(fd, bytearray(b"eggs\n"))
97 os.write(fd, memoryview(b"spam\n"))
98 os.close(fd)
99 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000100 self.assertEqual(fobj.read().splitlines(),
101 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000102
Victor Stinnere0daff12011-03-20 23:36:35 +0100103 def write_windows_console(self, *args):
104 retcode = subprocess.call(args,
105 # use a new console to not flood the test output
106 creationflags=subprocess.CREATE_NEW_CONSOLE,
107 # use a shell to hide the console window (SW_HIDE)
108 shell=True)
109 self.assertEqual(retcode, 0)
110
111 @unittest.skipUnless(sys.platform == 'win32',
112 'test specific to the Windows console')
113 def test_write_windows_console(self):
114 # Issue #11395: the Windows console returns an error (12: not enough
115 # space error) on writing into stdout if stdout mode is binary and the
116 # length is greater than 66,000 bytes (or less, depending on heap
117 # usage).
118 code = "print('x' * 100000)"
119 self.write_windows_console(sys.executable, "-c", code)
120 self.write_windows_console(sys.executable, "-u", "-c", code)
121
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000122
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000123class TemporaryFileTests(unittest.TestCase):
124 def setUp(self):
125 self.files = []
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000126 os.mkdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000127
128 def tearDown(self):
129 for name in self.files:
130 os.unlink(name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000131 os.rmdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000132
133 def check_tempfile(self, name):
134 # make sure it doesn't already exist:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000135 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000136 "file already exists for temporary file")
137 # make sure we can create the file
138 open(name, "w")
139 self.files.append(name)
140
141 def test_tempnam(self):
142 if not hasattr(os, "tempnam"):
143 return
144 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
145 r"test_os$")
146 self.check_tempfile(os.tempnam())
147
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000148 name = os.tempnam(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000149 self.check_tempfile(name)
150
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000151 name = os.tempnam(support.TESTFN, "pfx")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000152 self.assertTrue(os.path.basename(name)[:3] == "pfx")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000153 self.check_tempfile(name)
154
155 def test_tmpfile(self):
156 if not hasattr(os, "tmpfile"):
157 return
158 # As with test_tmpnam() below, the Windows implementation of tmpfile()
159 # attempts to create a file in the root directory of the current drive.
160 # On Vista and Server 2008, this test will always fail for normal users
161 # as writing to the root directory requires elevated privileges. With
162 # XP and below, the semantics of tmpfile() are the same, but the user
163 # running the test is more likely to have administrative privileges on
164 # their account already. If that's the case, then os.tmpfile() should
165 # work. In order to make this test as useful as possible, rather than
166 # trying to detect Windows versions or whether or not the user has the
167 # right permissions, just try and create a file in the root directory
168 # and see if it raises a 'Permission denied' OSError. If it does, then
169 # test that a subsequent call to os.tmpfile() raises the same error. If
170 # it doesn't, assume we're on XP or below and the user running the test
171 # has administrative privileges, and proceed with the test as normal.
172 if sys.platform == 'win32':
173 name = '\\python_test_os_test_tmpfile.txt'
174 if os.path.exists(name):
175 os.remove(name)
176 try:
177 fp = open(name, 'w')
178 except IOError as first:
179 # open() failed, assert tmpfile() fails in the same way.
180 # Although open() raises an IOError and os.tmpfile() raises an
181 # OSError(), 'args' will be (13, 'Permission denied') in both
182 # cases.
183 try:
184 fp = os.tmpfile()
185 except OSError as second:
186 self.assertEqual(first.args, second.args)
187 else:
188 self.fail("expected os.tmpfile() to raise OSError")
189 return
190 else:
191 # open() worked, therefore, tmpfile() should work. Close our
192 # dummy file and proceed with the test as normal.
193 fp.close()
194 os.remove(name)
195
196 fp = os.tmpfile()
197 fp.write("foobar")
198 fp.seek(0,0)
199 s = fp.read()
200 fp.close()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000201 self.assertTrue(s == "foobar")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000202
203 def test_tmpnam(self):
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000204 if not hasattr(os, "tmpnam"):
205 return
206 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
207 r"test_os$")
208 name = os.tmpnam()
209 if sys.platform in ("win32",):
210 # The Windows tmpnam() seems useless. From the MS docs:
211 #
212 # The character string that tmpnam creates consists of
213 # the path prefix, defined by the entry P_tmpdir in the
214 # file STDIO.H, followed by a sequence consisting of the
215 # digit characters '0' through '9'; the numerical value
216 # of this string is in the range 1 - 65,535. Changing the
217 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not
218 # change the operation of tmpnam.
219 #
220 # The really bizarre part is that, at least under MSVC6,
221 # P_tmpdir is "\\". That is, the path returned refers to
222 # the root of the current drive. That's a terrible place to
223 # put temp files, and, depending on privileges, the user
224 # may not even be able to open a file in the root directory.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000225 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000226 "file already exists for temporary file")
227 else:
228 self.check_tempfile(name)
229
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000230 def fdopen_helper(self, *args):
231 fd = os.open(support.TESTFN, os.O_RDONLY)
232 fp2 = os.fdopen(fd, *args)
233 fp2.close()
234
235 def test_fdopen(self):
236 self.fdopen_helper()
237 self.fdopen_helper('r')
238 self.fdopen_helper('r', 100)
239
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000240# Test attributes on return values from os.*stat* family.
241class StatAttributeTests(unittest.TestCase):
242 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000243 os.mkdir(support.TESTFN)
244 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000245 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000246 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 def tearDown(self):
250 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000251 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000252
Antoine Pitrou38425292010-09-21 18:19:07 +0000253 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254 if not hasattr(os, "stat"):
255 return
256
257 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000258 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259
260 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(result[stat.ST_SIZE], 3)
262 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264 # Make sure all the attributes are there
265 members = dir(result)
266 for name in dir(stat):
267 if name[:3] == 'ST_':
268 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000269 if name.endswith("TIME"):
270 def trunc(x): return int(x)
271 else:
272 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000273 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000274 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000275 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276
277 try:
278 result[200]
279 self.fail("No exception thrown")
280 except IndexError:
281 pass
282
283 # Make sure that assignment fails
284 try:
285 result.st_mode = 1
286 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000287 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000288 pass
289
290 try:
291 result.st_rdev = 1
292 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000293 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000294 pass
295
296 try:
297 result.parrot = 1
298 self.fail("No exception thrown")
299 except AttributeError:
300 pass
301
302 # Use the stat_result constructor with a too-short tuple.
303 try:
304 result2 = os.stat_result((10,))
305 self.fail("No exception thrown")
306 except TypeError:
307 pass
308
Ezio Melotti42da6632011-03-15 05:18:48 +0200309 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000310 try:
311 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
312 except TypeError:
313 pass
314
Antoine Pitrou38425292010-09-21 18:19:07 +0000315 def test_stat_attributes(self):
316 self.check_stat_attributes(self.fname)
317
318 def test_stat_attributes_bytes(self):
319 try:
320 fname = self.fname.encode(sys.getfilesystemencoding())
321 except UnicodeEncodeError:
322 self.skipTest("cannot encode %a for the filesystem" % self.fname)
323 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000324
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000325 def test_statvfs_attributes(self):
326 if not hasattr(os, "statvfs"):
327 return
328
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000329 try:
330 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000331 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000332 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000333 if e.errno == errno.ENOSYS:
334 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000335
336 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000337 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000338
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000339 # Make sure all the attributes are there.
340 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
341 'ffree', 'favail', 'flag', 'namemax')
342 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000343 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344
345 # Make sure that assignment really fails
346 try:
347 result.f_bfree = 1
348 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000349 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 pass
351
352 try:
353 result.parrot = 1
354 self.fail("No exception thrown")
355 except AttributeError:
356 pass
357
358 # Use the constructor with a too-short tuple.
359 try:
360 result2 = os.statvfs_result((10,))
361 self.fail("No exception thrown")
362 except TypeError:
363 pass
364
Ezio Melotti42da6632011-03-15 05:18:48 +0200365 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 try:
367 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
368 except TypeError:
369 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000370
Thomas Wouters89f507f2006-12-13 04:49:30 +0000371 def test_utime_dir(self):
372 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000373 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000374 # round to int, because some systems may support sub-second
375 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000376 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
377 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000378 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000379
380 # Restrict test to Win32, since there is no guarantee other
381 # systems support centiseconds
382 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000383 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000384 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000385 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000386 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000387 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000388 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000389 return buf.value
390
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000391 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000392 def test_1565150(self):
393 t1 = 1159195039.25
394 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000395 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000396
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000397 def test_large_time(self):
398 t1 = 5000000000 # some day in 2128
399 os.utime(self.fname, (t1, t1))
400 self.assertEqual(os.stat(self.fname).st_mtime, t1)
401
Guido van Rossumd8faa362007-04-27 19:54:29 +0000402 def test_1686475(self):
403 # Verify that an open file can be stat'ed
404 try:
405 os.stat(r"c:\pagefile.sys")
406 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000407 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000408 return
409 self.fail("Could not stat pagefile.sys")
410
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000411from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000412
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000413class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000414 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000415 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000416
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000417 def setUp(self):
418 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000419 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000420 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000421 for key, value in self._reference().items():
422 os.environ[key] = value
423
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000424 def tearDown(self):
425 os.environ.clear()
426 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000427 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000428 os.environb.clear()
429 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000430
Christian Heimes90333392007-11-01 19:08:42 +0000431 def _reference(self):
432 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
433
434 def _empty_mapping(self):
435 os.environ.clear()
436 return os.environ
437
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000438 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000439 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000440 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000441 if os.path.exists("/bin/sh"):
442 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000443 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
444 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000445 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000446
Christian Heimes1a13d592007-11-08 14:16:55 +0000447 def test_os_popen_iter(self):
448 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000449 with os.popen(
450 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
451 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000452 self.assertEqual(next(it), "line1\n")
453 self.assertEqual(next(it), "line2\n")
454 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000455 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000456
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000457 # Verify environ keys and values from the OS are of the
458 # correct str type.
459 def test_keyvalue_types(self):
460 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000461 self.assertEqual(type(key), str)
462 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000463
Christian Heimes90333392007-11-01 19:08:42 +0000464 def test_items(self):
465 for key, value in self._reference().items():
466 self.assertEqual(os.environ.get(key), value)
467
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000468 # Issue 7310
469 def test___repr__(self):
470 """Check that the repr() of os.environ looks like environ({...})."""
471 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000472 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
473 '{!r}: {!r}'.format(key, value)
474 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000475
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000476 def test_get_exec_path(self):
477 defpath_list = os.defpath.split(os.pathsep)
478 test_path = ['/monty', '/python', '', '/flying/circus']
479 test_env = {'PATH': os.pathsep.join(test_path)}
480
481 saved_environ = os.environ
482 try:
483 os.environ = dict(test_env)
484 # Test that defaulting to os.environ works.
485 self.assertSequenceEqual(test_path, os.get_exec_path())
486 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
487 finally:
488 os.environ = saved_environ
489
490 # No PATH environment variable
491 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
492 # Empty PATH environment variable
493 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
494 # Supplied PATH environment variable
495 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
496
Victor Stinnerb745a742010-05-18 17:17:23 +0000497 if os.supports_bytes_environ:
498 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000499 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000500 # ignore BytesWarning warning
501 with warnings.catch_warnings(record=True):
502 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000503 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000504 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000505 pass
506 else:
507 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000508
509 # bytes key and/or value
510 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
511 ['abc'])
512 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
513 ['abc'])
514 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
515 ['abc'])
516
517 @unittest.skipUnless(os.supports_bytes_environ,
518 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000519 def test_environb(self):
520 # os.environ -> os.environb
521 value = 'euro\u20ac'
522 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000523 value_bytes = value.encode(sys.getfilesystemencoding(),
524 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000525 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000526 msg = "U+20AC character is not encodable to %s" % (
527 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000528 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000529 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000530 self.assertEqual(os.environ['unicode'], value)
531 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000532
533 # os.environb -> os.environ
534 value = b'\xff'
535 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000536 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000537 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000538 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000539
Tim Petersc4e09402003-04-25 07:11:48 +0000540class WalkTests(unittest.TestCase):
541 """Tests for os.walk()."""
542
543 def test_traversal(self):
544 import os
545 from os.path import join
546
547 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000548 # TESTFN/
549 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000550 # tmp1
551 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000552 # tmp2
553 # SUB11/ no kids
554 # SUB2/ a file kid and a dirsymlink kid
555 # tmp3
556 # link/ a symlink to TESTFN.2
557 # TEST2/
558 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000559 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000560 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000561 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000562 sub2_path = join(walk_path, "SUB2")
563 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000564 tmp2_path = join(sub1_path, "tmp2")
565 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000566 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000567 t2_path = join(support.TESTFN, "TEST2")
568 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000569
570 # Create stuff.
571 os.makedirs(sub11_path)
572 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000573 os.makedirs(t2_path)
574 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000575 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000576 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
577 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000578 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000579 os.symlink(os.path.abspath(t2_path), link_path)
580 sub2_tree = (sub2_path, ["link"], ["tmp3"])
581 else:
582 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000583
584 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000585 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000586 self.assertEqual(len(all), 4)
587 # We can't know which order SUB1 and SUB2 will appear in.
588 # Not flipped: TESTFN, SUB1, SUB11, SUB2
589 # flipped: TESTFN, SUB2, SUB1, SUB11
590 flipped = all[0][1][0] != "SUB1"
591 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000592 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000593 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
594 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000595 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000596
597 # Prune the search.
598 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000599 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000600 all.append((root, dirs, files))
601 # Don't descend into SUB1.
602 if 'SUB1' in dirs:
603 # Note that this also mutates the dirs we appended to all!
604 dirs.remove('SUB1')
605 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000606 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
607 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000608
609 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000610 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000611 self.assertEqual(len(all), 4)
612 # We can't know which order SUB1 and SUB2 will appear in.
613 # Not flipped: SUB11, SUB1, SUB2, TESTFN
614 # flipped: SUB2, SUB11, SUB1, TESTFN
615 flipped = all[3][1][0] != "SUB1"
616 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000617 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000618 self.assertEqual(all[flipped], (sub11_path, [], []))
619 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000620 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000621
Brian Curtin3b4499c2010-12-28 14:31:47 +0000622 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000623 # Walk, following symlinks.
624 for root, dirs, files in os.walk(walk_path, followlinks=True):
625 if root == link_path:
626 self.assertEqual(dirs, [])
627 self.assertEqual(files, ["tmp4"])
628 break
629 else:
630 self.fail("Didn't follow symlink with followlinks=True")
631
632 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000633 # Tear everything down. This is a decent use for bottom-up on
634 # Windows, which doesn't have a recursive delete command. The
635 # (not so) subtlety is that rmdir will fail unless the dir's
636 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000637 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000638 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000639 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000640 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000641 dirname = os.path.join(root, name)
642 if not os.path.islink(dirname):
643 os.rmdir(dirname)
644 else:
645 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000646 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000647
Guido van Rossume7ba4952007-06-06 23:52:48 +0000648class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000649 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000650 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000651
652 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000653 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000654 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
655 os.makedirs(path) # Should work
656 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
657 os.makedirs(path)
658
659 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000660 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000661 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
662 os.makedirs(path)
663 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
664 'dir5', 'dir6')
665 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000666
Terry Reedy5a22b652010-12-02 07:05:56 +0000667 def test_exist_ok_existing_directory(self):
668 path = os.path.join(support.TESTFN, 'dir1')
669 mode = 0o777
670 old_mask = os.umask(0o022)
671 os.makedirs(path, mode)
672 self.assertRaises(OSError, os.makedirs, path, mode)
673 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
674 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
675 os.makedirs(path, mode=mode, exist_ok=True)
676 os.umask(old_mask)
677
678 def test_exist_ok_existing_regular_file(self):
679 base = support.TESTFN
680 path = os.path.join(support.TESTFN, 'dir1')
681 f = open(path, 'w')
682 f.write('abc')
683 f.close()
684 self.assertRaises(OSError, os.makedirs, path)
685 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
686 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
687 os.remove(path)
688
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000689 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000690 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000691 'dir4', 'dir5', 'dir6')
692 # If the tests failed, the bottom-most directory ('../dir6')
693 # may not have been created, so we look for the outermost directory
694 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000695 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000696 path = os.path.dirname(path)
697
698 os.removedirs(path)
699
Guido van Rossume7ba4952007-06-06 23:52:48 +0000700class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000701 def test_devnull(self):
Alex Martelli01c77c62006-08-24 02:58:11 +0000702 f = open(os.devnull, 'w')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000703 f.write('hello')
704 f.close()
Alex Martelli01c77c62006-08-24 02:58:11 +0000705 f = open(os.devnull, 'r')
Tim Peters4182cfd2004-06-08 20:34:34 +0000706 self.assertEqual(f.read(), '')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000707 f.close()
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000708
Guido van Rossume7ba4952007-06-06 23:52:48 +0000709class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000710 def test_urandom(self):
711 try:
712 self.assertEqual(len(os.urandom(1)), 1)
713 self.assertEqual(len(os.urandom(10)), 10)
714 self.assertEqual(len(os.urandom(100)), 100)
715 self.assertEqual(len(os.urandom(1000)), 1000)
716 except NotImplementedError:
717 pass
718
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000719@contextlib.contextmanager
720def _execvpe_mockup(defpath=None):
721 """
722 Stubs out execv and execve functions when used as context manager.
723 Records exec calls. The mock execv and execve functions always raise an
724 exception as they would normally never return.
725 """
726 # A list of tuples containing (function name, first arg, args)
727 # of calls to execv or execve that have been made.
728 calls = []
729
730 def mock_execv(name, *args):
731 calls.append(('execv', name, args))
732 raise RuntimeError("execv called")
733
734 def mock_execve(name, *args):
735 calls.append(('execve', name, args))
736 raise OSError(errno.ENOTDIR, "execve called")
737
738 try:
739 orig_execv = os.execv
740 orig_execve = os.execve
741 orig_defpath = os.defpath
742 os.execv = mock_execv
743 os.execve = mock_execve
744 if defpath is not None:
745 os.defpath = defpath
746 yield calls
747 finally:
748 os.execv = orig_execv
749 os.execve = orig_execve
750 os.defpath = orig_defpath
751
Guido van Rossume7ba4952007-06-06 23:52:48 +0000752class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000753 @unittest.skipIf(USING_LINUXTHREADS,
754 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000755 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000756 self.assertRaises(OSError, os.execvpe, 'no such app-',
757 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000758
Thomas Heller6790d602007-08-30 17:15:14 +0000759 def test_execvpe_with_bad_arglist(self):
760 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
761
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000762 @unittest.skipUnless(hasattr(os, '_execvpe'),
763 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000764 def _test_internal_execvpe(self, test_type):
765 program_path = os.sep + 'absolutepath'
766 if test_type is bytes:
767 program = b'executable'
768 fullpath = os.path.join(os.fsencode(program_path), program)
769 native_fullpath = fullpath
770 arguments = [b'progname', 'arg1', 'arg2']
771 else:
772 program = 'executable'
773 arguments = ['progname', 'arg1', 'arg2']
774 fullpath = os.path.join(program_path, program)
775 if os.name != "nt":
776 native_fullpath = os.fsencode(fullpath)
777 else:
778 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000779 env = {'spam': 'beans'}
780
Victor Stinnerb745a742010-05-18 17:17:23 +0000781 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000782 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000783 self.assertRaises(RuntimeError,
784 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000785 self.assertEqual(len(calls), 1)
786 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
787
Victor Stinnerb745a742010-05-18 17:17:23 +0000788 # test os._execvpe() with a relative path:
789 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000790 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000791 self.assertRaises(OSError,
792 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000793 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000794 self.assertSequenceEqual(calls[0],
795 ('execve', native_fullpath, (arguments, env)))
796
797 # test os._execvpe() with a relative path:
798 # os.get_exec_path() reads the 'PATH' variable
799 with _execvpe_mockup() as calls:
800 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000801 if test_type is bytes:
802 env_path[b'PATH'] = program_path
803 else:
804 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000805 self.assertRaises(OSError,
806 os._execvpe, program, arguments, env=env_path)
807 self.assertEqual(len(calls), 1)
808 self.assertSequenceEqual(calls[0],
809 ('execve', native_fullpath, (arguments, env_path)))
810
811 def test_internal_execvpe_str(self):
812 self._test_internal_execvpe(str)
813 if os.name != "nt":
814 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000815
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000816
Thomas Wouters477c8d52006-05-27 19:21:47 +0000817class Win32ErrorTests(unittest.TestCase):
818 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000819 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000820
821 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000822 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000823
824 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000825 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000826
827 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000828 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000829 try:
830 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
831 finally:
832 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000833 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000834
835 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000836 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000837
Thomas Wouters477c8d52006-05-27 19:21:47 +0000838 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000839 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000840
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000841class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000842 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000843 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
844 #singles.append("close")
845 #We omit close because it doesn'r raise an exception on some platforms
846 def get_single(f):
847 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000848 if hasattr(os, f):
849 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000850 return helper
851 for f in singles:
852 locals()["test_"+f] = get_single(f)
853
Benjamin Peterson7522c742009-01-19 21:00:09 +0000854 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000855 try:
856 f(support.make_bad_fd(), *args)
857 except OSError as e:
858 self.assertEqual(e.errno, errno.EBADF)
859 else:
860 self.fail("%r didn't raise a OSError with a bad file descriptor"
861 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000862
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000863 def test_isatty(self):
864 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000865 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000866
867 def test_closerange(self):
868 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000869 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000870 # Make sure none of the descriptors we are about to close are
871 # currently valid (issue 6542).
872 for i in range(10):
873 try: os.fstat(fd+i)
874 except OSError:
875 pass
876 else:
877 break
878 if i < 2:
879 raise unittest.SkipTest(
880 "Unable to acquire a range of invalid file descriptors")
881 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000882
883 def test_dup2(self):
884 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000885 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000886
887 def test_fchmod(self):
888 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000889 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000890
891 def test_fchown(self):
892 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000893 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000894
895 def test_fpathconf(self):
896 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000897 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000898
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000899 def test_ftruncate(self):
900 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000901 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000902
903 def test_lseek(self):
904 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000905 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000906
907 def test_read(self):
908 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000909 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000910
911 def test_tcsetpgrpt(self):
912 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000913 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000914
915 def test_write(self):
916 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000917 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000918
Brian Curtin1b9df392010-11-24 20:24:31 +0000919
920class LinkTests(unittest.TestCase):
921 def setUp(self):
922 self.file1 = support.TESTFN
923 self.file2 = os.path.join(support.TESTFN + "2")
924
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000925 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000926 for file in (self.file1, self.file2):
927 if os.path.exists(file):
928 os.unlink(file)
929
Brian Curtin1b9df392010-11-24 20:24:31 +0000930 def _test_link(self, file1, file2):
931 with open(file1, "w") as f1:
932 f1.write("test")
933
934 os.link(file1, file2)
935 with open(file1, "r") as f1, open(file2, "r") as f2:
936 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
937
938 def test_link(self):
939 self._test_link(self.file1, self.file2)
940
941 def test_link_bytes(self):
942 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
943 bytes(self.file2, sys.getfilesystemencoding()))
944
Brian Curtinf498b752010-11-30 15:54:04 +0000945 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000946 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000947 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000948 except UnicodeError:
949 raise unittest.SkipTest("Unable to encode for this platform.")
950
Brian Curtinf498b752010-11-30 15:54:04 +0000951 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000952 self.file2 = self.file1 + "2"
953 self._test_link(self.file1, self.file2)
954
Thomas Wouters477c8d52006-05-27 19:21:47 +0000955if sys.platform != 'win32':
956 class Win32ErrorTests(unittest.TestCase):
957 pass
958
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000959 class PosixUidGidTests(unittest.TestCase):
960 if hasattr(os, 'setuid'):
961 def test_setuid(self):
962 if os.getuid() != 0:
963 self.assertRaises(os.error, os.setuid, 0)
964 self.assertRaises(OverflowError, os.setuid, 1<<32)
965
966 if hasattr(os, 'setgid'):
967 def test_setgid(self):
968 if os.getuid() != 0:
969 self.assertRaises(os.error, os.setgid, 0)
970 self.assertRaises(OverflowError, os.setgid, 1<<32)
971
972 if hasattr(os, 'seteuid'):
973 def test_seteuid(self):
974 if os.getuid() != 0:
975 self.assertRaises(os.error, os.seteuid, 0)
976 self.assertRaises(OverflowError, os.seteuid, 1<<32)
977
978 if hasattr(os, 'setegid'):
979 def test_setegid(self):
980 if os.getuid() != 0:
981 self.assertRaises(os.error, os.setegid, 0)
982 self.assertRaises(OverflowError, os.setegid, 1<<32)
983
984 if hasattr(os, 'setreuid'):
985 def test_setreuid(self):
986 if os.getuid() != 0:
987 self.assertRaises(os.error, os.setreuid, 0, 0)
988 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
989 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000990
991 def test_setreuid_neg1(self):
992 # Needs to accept -1. We run this in a subprocess to avoid
993 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000994 subprocess.check_call([
995 sys.executable, '-c',
996 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000997
998 if hasattr(os, 'setregid'):
999 def test_setregid(self):
1000 if os.getuid() != 0:
1001 self.assertRaises(os.error, os.setregid, 0, 0)
1002 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1003 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001004
1005 def test_setregid_neg1(self):
1006 # Needs to accept -1. We run this in a subprocess to avoid
1007 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001008 subprocess.check_call([
1009 sys.executable, '-c',
1010 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001011
1012 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001013 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001014 if support.TESTFN_UNENCODABLE:
1015 self.dir = support.TESTFN_UNENCODABLE
1016 else:
1017 self.dir = support.TESTFN
1018 self.bdir = os.fsencode(self.dir)
1019
1020 bytesfn = []
1021 def add_filename(fn):
1022 try:
1023 fn = os.fsencode(fn)
1024 except UnicodeEncodeError:
1025 return
1026 bytesfn.append(fn)
1027 add_filename(support.TESTFN_UNICODE)
1028 if support.TESTFN_UNENCODABLE:
1029 add_filename(support.TESTFN_UNENCODABLE)
1030 if not bytesfn:
1031 self.skipTest("couldn't create any non-ascii filename")
1032
1033 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001034 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001035 try:
1036 for fn in bytesfn:
1037 f = open(os.path.join(self.bdir, fn), "w")
1038 f.close()
Victor Stinnere8d51452010-08-19 01:05:19 +00001039 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001040 if fn in self.unicodefn:
1041 raise ValueError("duplicate filename")
1042 self.unicodefn.add(fn)
1043 except:
1044 shutil.rmtree(self.dir)
1045 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001046
1047 def tearDown(self):
1048 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001049
1050 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001051 expected = self.unicodefn
1052 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001053 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001054
1055 def test_open(self):
1056 for fn in self.unicodefn:
1057 f = open(os.path.join(self.dir, fn))
1058 f.close()
1059
1060 def test_stat(self):
1061 for fn in self.unicodefn:
1062 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001063else:
1064 class PosixUidGidTests(unittest.TestCase):
1065 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001066 class Pep383Tests(unittest.TestCase):
1067 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001068
Brian Curtineb24d742010-04-12 17:16:38 +00001069@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1070class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001071 def _kill(self, sig):
1072 # Start sys.executable as a subprocess and communicate from the
1073 # subprocess to the parent that the interpreter is ready. When it
1074 # becomes ready, send *sig* via os.kill to the subprocess and check
1075 # that the return code is equal to *sig*.
1076 import ctypes
1077 from ctypes import wintypes
1078 import msvcrt
1079
1080 # Since we can't access the contents of the process' stdout until the
1081 # process has exited, use PeekNamedPipe to see what's inside stdout
1082 # without waiting. This is done so we can tell that the interpreter
1083 # is started and running at a point where it could handle a signal.
1084 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1085 PeekNamedPipe.restype = wintypes.BOOL
1086 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1087 ctypes.POINTER(ctypes.c_char), # stdout buf
1088 wintypes.DWORD, # Buffer size
1089 ctypes.POINTER(wintypes.DWORD), # bytes read
1090 ctypes.POINTER(wintypes.DWORD), # bytes avail
1091 ctypes.POINTER(wintypes.DWORD)) # bytes left
1092 msg = "running"
1093 proc = subprocess.Popen([sys.executable, "-c",
1094 "import sys;"
1095 "sys.stdout.write('{}');"
1096 "sys.stdout.flush();"
1097 "input()".format(msg)],
1098 stdout=subprocess.PIPE,
1099 stderr=subprocess.PIPE,
1100 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001101 self.addCleanup(proc.stdout.close)
1102 self.addCleanup(proc.stderr.close)
1103 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001104
1105 count, max = 0, 100
1106 while count < max and proc.poll() is None:
1107 # Create a string buffer to store the result of stdout from the pipe
1108 buf = ctypes.create_string_buffer(len(msg))
1109 # Obtain the text currently in proc.stdout
1110 # Bytes read/avail/left are left as NULL and unused
1111 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1112 buf, ctypes.sizeof(buf), None, None, None)
1113 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1114 if buf.value:
1115 self.assertEqual(msg, buf.value.decode())
1116 break
1117 time.sleep(0.1)
1118 count += 1
1119 else:
1120 self.fail("Did not receive communication from the subprocess")
1121
Brian Curtineb24d742010-04-12 17:16:38 +00001122 os.kill(proc.pid, sig)
1123 self.assertEqual(proc.wait(), sig)
1124
1125 def test_kill_sigterm(self):
1126 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001127 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001128
1129 def test_kill_int(self):
1130 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001131 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001132
1133 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001134 tagname = "test_os_%s" % uuid.uuid1()
1135 m = mmap.mmap(-1, 1, tagname)
1136 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001137 # Run a script which has console control handling enabled.
1138 proc = subprocess.Popen([sys.executable,
1139 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001140 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001141 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1142 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001143 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001144 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001145 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001146 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001147 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001148 count += 1
1149 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001150 # Forcefully kill the process if we weren't able to signal it.
1151 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001152 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001153 os.kill(proc.pid, event)
1154 # proc.send_signal(event) could also be done here.
1155 # Allow time for the signal to be passed and the process to exit.
1156 time.sleep(0.5)
1157 if not proc.poll():
1158 # Forcefully kill the process if we weren't able to signal it.
1159 os.kill(proc.pid, signal.SIGINT)
1160 self.fail("subprocess did not stop on {}".format(name))
1161
1162 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1163 def test_CTRL_C_EVENT(self):
1164 from ctypes import wintypes
1165 import ctypes
1166
1167 # Make a NULL value by creating a pointer with no argument.
1168 NULL = ctypes.POINTER(ctypes.c_int)()
1169 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1170 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1171 wintypes.BOOL)
1172 SetConsoleCtrlHandler.restype = wintypes.BOOL
1173
1174 # Calling this with NULL and FALSE causes the calling process to
1175 # handle CTRL+C, rather than ignore it. This property is inherited
1176 # by subprocesses.
1177 SetConsoleCtrlHandler(NULL, 0)
1178
1179 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1180
1181 def test_CTRL_BREAK_EVENT(self):
1182 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1183
1184
Brian Curtind40e6f72010-07-08 21:39:08 +00001185@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001186@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001187class Win32SymlinkTests(unittest.TestCase):
1188 filelink = 'filelinktest'
1189 filelink_target = os.path.abspath(__file__)
1190 dirlink = 'dirlinktest'
1191 dirlink_target = os.path.dirname(filelink_target)
1192 missing_link = 'missing link'
1193
1194 def setUp(self):
1195 assert os.path.exists(self.dirlink_target)
1196 assert os.path.exists(self.filelink_target)
1197 assert not os.path.exists(self.dirlink)
1198 assert not os.path.exists(self.filelink)
1199 assert not os.path.exists(self.missing_link)
1200
1201 def tearDown(self):
1202 if os.path.exists(self.filelink):
1203 os.remove(self.filelink)
1204 if os.path.exists(self.dirlink):
1205 os.rmdir(self.dirlink)
1206 if os.path.lexists(self.missing_link):
1207 os.remove(self.missing_link)
1208
1209 def test_directory_link(self):
1210 os.symlink(self.dirlink_target, self.dirlink)
1211 self.assertTrue(os.path.exists(self.dirlink))
1212 self.assertTrue(os.path.isdir(self.dirlink))
1213 self.assertTrue(os.path.islink(self.dirlink))
1214 self.check_stat(self.dirlink, self.dirlink_target)
1215
1216 def test_file_link(self):
1217 os.symlink(self.filelink_target, self.filelink)
1218 self.assertTrue(os.path.exists(self.filelink))
1219 self.assertTrue(os.path.isfile(self.filelink))
1220 self.assertTrue(os.path.islink(self.filelink))
1221 self.check_stat(self.filelink, self.filelink_target)
1222
1223 def _create_missing_dir_link(self):
1224 'Create a "directory" link to a non-existent target'
1225 linkname = self.missing_link
1226 if os.path.lexists(linkname):
1227 os.remove(linkname)
1228 target = r'c:\\target does not exist.29r3c740'
1229 assert not os.path.exists(target)
1230 target_is_dir = True
1231 os.symlink(target, linkname, target_is_dir)
1232
1233 def test_remove_directory_link_to_missing_target(self):
1234 self._create_missing_dir_link()
1235 # For compatibility with Unix, os.remove will check the
1236 # directory status and call RemoveDirectory if the symlink
1237 # was created with target_is_dir==True.
1238 os.remove(self.missing_link)
1239
1240 @unittest.skip("currently fails; consider for improvement")
1241 def test_isdir_on_directory_link_to_missing_target(self):
1242 self._create_missing_dir_link()
1243 # consider having isdir return true for directory links
1244 self.assertTrue(os.path.isdir(self.missing_link))
1245
1246 @unittest.skip("currently fails; consider for improvement")
1247 def test_rmdir_on_directory_link_to_missing_target(self):
1248 self._create_missing_dir_link()
1249 # consider allowing rmdir to remove directory links
1250 os.rmdir(self.missing_link)
1251
1252 def check_stat(self, link, target):
1253 self.assertEqual(os.stat(link), os.stat(target))
1254 self.assertNotEqual(os.lstat(link), os.stat(link))
1255
1256
Victor Stinnere8d51452010-08-19 01:05:19 +00001257class FSEncodingTests(unittest.TestCase):
1258 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001259 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1260 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001261
Victor Stinnere8d51452010-08-19 01:05:19 +00001262 def test_identity(self):
1263 # assert fsdecode(fsencode(x)) == x
1264 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1265 try:
1266 bytesfn = os.fsencode(fn)
1267 except UnicodeEncodeError:
1268 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001269 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001270
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001271
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001272class PidTests(unittest.TestCase):
1273 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1274 def test_getppid(self):
1275 p = subprocess.Popen([sys.executable, '-c',
1276 'import os; print(os.getppid())'],
1277 stdout=subprocess.PIPE)
1278 stdout, _ = p.communicate()
1279 # We are the parent of our subprocess
1280 self.assertEqual(int(stdout), os.getpid())
1281
1282
Brian Curtin0151b8e2010-09-24 13:43:43 +00001283# The introduction of this TestCase caused at least two different errors on
1284# *nix buildbots. Temporarily skip this to let the buildbots move along.
1285@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001286@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1287class LoginTests(unittest.TestCase):
1288 def test_getlogin(self):
1289 user_name = os.getlogin()
1290 self.assertNotEqual(len(user_name), 0)
1291
1292
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001293@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1294 "needs os.getpriority and os.setpriority")
1295class ProgramPriorityTests(unittest.TestCase):
1296 """Tests for os.getpriority() and os.setpriority()."""
1297
1298 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001299
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001300 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1301 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1302 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001303 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1304 if base >= 19 and new_prio <= 19:
1305 raise unittest.SkipTest(
1306 "unable to reliably test setpriority at current nice level of %s" % base)
1307 else:
1308 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001309 finally:
1310 try:
1311 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1312 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001313 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001314 raise
1315
1316
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001317class SendfileTestServer(asyncore.dispatcher, threading.Thread):
1318
1319 class Handler(asynchat.async_chat):
1320
1321 def __init__(self, conn):
1322 asynchat.async_chat.__init__(self, conn)
1323 self.in_buffer = []
1324 self.closed = False
1325 self.push(b"220 ready\r\n")
1326
1327 def handle_read(self):
1328 data = self.recv(4096)
1329 self.in_buffer.append(data)
1330
1331 def get_data(self):
1332 return b''.join(self.in_buffer)
1333
1334 def handle_close(self):
1335 self.close()
1336 self.closed = True
1337
1338 def handle_error(self):
1339 raise
1340
1341 def __init__(self, address):
1342 threading.Thread.__init__(self)
1343 asyncore.dispatcher.__init__(self)
1344 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1345 self.bind(address)
1346 self.listen(5)
1347 self.host, self.port = self.socket.getsockname()[:2]
1348 self.handler_instance = None
1349 self._active = False
1350 self._active_lock = threading.Lock()
1351
1352 # --- public API
1353
1354 @property
1355 def running(self):
1356 return self._active
1357
1358 def start(self):
1359 assert not self.running
1360 self.__flag = threading.Event()
1361 threading.Thread.start(self)
1362 self.__flag.wait()
1363
1364 def stop(self):
1365 assert self.running
1366 self._active = False
1367 self.join()
1368
1369 def wait(self):
1370 # wait for handler connection to be closed, then stop the server
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001371 while not getattr(self.handler_instance, "closed", False):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001372 time.sleep(0.001)
1373 self.stop()
1374
1375 # --- internals
1376
1377 def run(self):
1378 self._active = True
1379 self.__flag.set()
1380 while self._active and asyncore.socket_map:
1381 self._active_lock.acquire()
1382 asyncore.loop(timeout=0.001, count=1)
1383 self._active_lock.release()
1384 asyncore.close_all()
1385
1386 def handle_accept(self):
1387 conn, addr = self.accept()
1388 self.handler_instance = self.Handler(conn)
1389
1390 def handle_connect(self):
1391 self.close()
1392 handle_read = handle_connect
1393
1394 def writable(self):
1395 return 0
1396
1397 def handle_error(self):
1398 raise
1399
1400
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001401@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001402@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1403class TestSendfile(unittest.TestCase):
1404
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001405 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001406 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001407 not sys.platform.startswith("solaris") and \
1408 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001409
1410 @classmethod
1411 def setUpClass(cls):
1412 with open(support.TESTFN, "wb") as f:
1413 f.write(cls.DATA)
1414
1415 @classmethod
1416 def tearDownClass(cls):
1417 support.unlink(support.TESTFN)
1418
1419 def setUp(self):
1420 self.server = SendfileTestServer((support.HOST, 0))
1421 self.server.start()
1422 self.client = socket.socket()
1423 self.client.connect((self.server.host, self.server.port))
1424 self.client.settimeout(1)
1425 # synchronize by waiting for "220 ready" response
1426 self.client.recv(1024)
1427 self.sockno = self.client.fileno()
1428 self.file = open(support.TESTFN, 'rb')
1429 self.fileno = self.file.fileno()
1430
1431 def tearDown(self):
1432 self.file.close()
1433 self.client.close()
1434 if self.server.running:
1435 self.server.stop()
1436
1437 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1438 """A higher level wrapper representing how an application is
1439 supposed to use sendfile().
1440 """
1441 while 1:
1442 try:
1443 if self.SUPPORT_HEADERS_TRAILERS:
1444 return os.sendfile(sock, file, offset, nbytes, headers,
1445 trailers)
1446 else:
1447 return os.sendfile(sock, file, offset, nbytes)
1448 except OSError as err:
1449 if err.errno == errno.ECONNRESET:
1450 # disconnected
1451 raise
1452 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1453 # we have to retry send data
1454 continue
1455 else:
1456 raise
1457
1458 def test_send_whole_file(self):
1459 # normal send
1460 total_sent = 0
1461 offset = 0
1462 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001463 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001464 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1465 if sent == 0:
1466 break
1467 offset += sent
1468 total_sent += sent
1469 self.assertTrue(sent <= nbytes)
1470 self.assertEqual(offset, total_sent)
1471
1472 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001473 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001474 self.client.close()
1475 self.server.wait()
1476 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001477 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001478 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001479
1480 def test_send_at_certain_offset(self):
1481 # start sending a file at a certain offset
1482 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001483 offset = len(self.DATA) // 2
1484 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001485 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001486 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001487 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1488 if sent == 0:
1489 break
1490 offset += sent
1491 total_sent += sent
1492 self.assertTrue(sent <= nbytes)
1493
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001494 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001495 self.client.close()
1496 self.server.wait()
1497 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001498 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001499 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001500 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001501 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001502
1503 def test_offset_overflow(self):
1504 # specify an offset > file size
1505 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001506 try:
1507 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1508 except OSError as e:
1509 # Solaris can raise EINVAL if offset >= file length, ignore.
1510 if e.errno != errno.EINVAL:
1511 raise
1512 else:
1513 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001514 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001515 self.client.close()
1516 self.server.wait()
1517 data = self.server.handler_instance.get_data()
1518 self.assertEqual(data, b'')
1519
1520 def test_invalid_offset(self):
1521 with self.assertRaises(OSError) as cm:
1522 os.sendfile(self.sockno, self.fileno, -1, 4096)
1523 self.assertEqual(cm.exception.errno, errno.EINVAL)
1524
1525 # --- headers / trailers tests
1526
1527 if SUPPORT_HEADERS_TRAILERS:
1528
1529 def test_headers(self):
1530 total_sent = 0
1531 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1532 headers=[b"x" * 512])
1533 total_sent += sent
1534 offset = 4096
1535 nbytes = 4096
1536 while 1:
1537 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1538 offset, nbytes)
1539 if sent == 0:
1540 break
1541 total_sent += sent
1542 offset += sent
1543
1544 expected_data = b"x" * 512 + self.DATA
1545 self.assertEqual(total_sent, len(expected_data))
1546 self.client.close()
1547 self.server.wait()
1548 data = self.server.handler_instance.get_data()
1549 self.assertEqual(hash(data), hash(expected_data))
1550
1551 def test_trailers(self):
1552 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001553 with open(TESTFN2, 'wb') as f:
1554 f.write(b"abcde")
1555 with open(TESTFN2, 'rb')as f:
1556 self.addCleanup(os.remove, TESTFN2)
1557 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1558 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001559 self.client.close()
1560 self.server.wait()
1561 data = self.server.handler_instance.get_data()
1562 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001563
1564 if hasattr(os, "SF_NODISKIO"):
1565 def test_flags(self):
1566 try:
1567 os.sendfile(self.sockno, self.fileno, 0, 4096,
1568 flags=os.SF_NODISKIO)
1569 except OSError as err:
1570 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1571 raise
1572
1573
Fred Drake2e2be372001-09-20 21:33:42 +00001574def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001575 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001576 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001577 StatAttributeTests,
1578 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001579 WalkTests,
1580 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001581 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001582 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001583 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001584 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001585 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001586 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001587 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001588 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001589 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001590 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001591 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001592 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001593 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001594 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001595 ProgramPriorityTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001596 )
Fred Drake2e2be372001-09-20 21:33:42 +00001597
1598if __name__ == "__main__":
1599 test_main()