blob: e075a6b3fc362d79bb1cf327e0a21eb56cf7e0d1 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
17import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000018import asyncore
19import asynchat
20import socket
21try:
22 import threading
23except ImportError:
24 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000025
Mark Dickinson7cf03892010-04-16 13:45:35 +000026# Detect whether we're on a Linux system that uses the (now outdated
27# and unmaintained) linuxthreads threading library. There's an issue
28# when combining linuxthreads with a failed execv call: see
29# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020030if hasattr(sys, 'thread_info') and sys.thread_info.version:
31 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
32else:
33 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000034
Thomas Wouters0e3f5912006-08-11 14:57:12 +000035# Tests creating TESTFN
36class FileTests(unittest.TestCase):
37 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038 if os.path.exists(support.TESTFN):
39 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000040 tearDown = setUp
41
42 def test_access(self):
Victor Stinnere2185d72011-06-29 13:04:35 +020043 self.assertEqual(1,2)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000044 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000045 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000046 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000047
Christian Heimesfdab48e2008-01-20 09:06:41 +000048 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000049 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
50 # We must allocate two consecutive file descriptors, otherwise
51 # it will mess up other file descriptors (perhaps even the three
52 # standard ones).
53 second = os.dup(first)
54 try:
55 retries = 0
56 while second != first + 1:
57 os.close(first)
58 retries += 1
59 if retries > 10:
60 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000061 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000062 first, second = second, os.dup(second)
63 finally:
64 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000065 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000066 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000067 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000068
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000069 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000070 def test_rename(self):
71 path = support.TESTFN
72 old = sys.getrefcount(path)
73 self.assertRaises(TypeError, os.rename, path, 0)
74 new = sys.getrefcount(path)
75 self.assertEqual(old, new)
76
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000077 def test_read(self):
78 with open(support.TESTFN, "w+b") as fobj:
79 fobj.write(b"spam")
80 fobj.flush()
81 fd = fobj.fileno()
82 os.lseek(fd, 0, 0)
83 s = os.read(fd, 4)
84 self.assertEqual(type(s), bytes)
85 self.assertEqual(s, b"spam")
86
87 def test_write(self):
88 # os.write() accepts bytes- and buffer-like objects but not strings
89 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
90 self.assertRaises(TypeError, os.write, fd, "beans")
91 os.write(fd, b"bacon\n")
92 os.write(fd, bytearray(b"eggs\n"))
93 os.write(fd, memoryview(b"spam\n"))
94 os.close(fd)
95 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000096 self.assertEqual(fobj.read().splitlines(),
97 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000098
Victor Stinnere0daff12011-03-20 23:36:35 +010099 def write_windows_console(self, *args):
100 retcode = subprocess.call(args,
101 # use a new console to not flood the test output
102 creationflags=subprocess.CREATE_NEW_CONSOLE,
103 # use a shell to hide the console window (SW_HIDE)
104 shell=True)
105 self.assertEqual(retcode, 0)
106
107 @unittest.skipUnless(sys.platform == 'win32',
108 'test specific to the Windows console')
109 def test_write_windows_console(self):
110 # Issue #11395: the Windows console returns an error (12: not enough
111 # space error) on writing into stdout if stdout mode is binary and the
112 # length is greater than 66,000 bytes (or less, depending on heap
113 # usage).
114 code = "print('x' * 100000)"
115 self.write_windows_console(sys.executable, "-c", code)
116 self.write_windows_console(sys.executable, "-u", "-c", code)
117
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000118
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000119class TemporaryFileTests(unittest.TestCase):
120 def setUp(self):
121 self.files = []
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000122 os.mkdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000123
124 def tearDown(self):
125 for name in self.files:
126 os.unlink(name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000127 os.rmdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000128
129 def check_tempfile(self, name):
130 # make sure it doesn't already exist:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000131 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000132 "file already exists for temporary file")
133 # make sure we can create the file
134 open(name, "w")
135 self.files.append(name)
136
137 def test_tempnam(self):
138 if not hasattr(os, "tempnam"):
139 return
140 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
141 r"test_os$")
142 self.check_tempfile(os.tempnam())
143
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000144 name = os.tempnam(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000145 self.check_tempfile(name)
146
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000147 name = os.tempnam(support.TESTFN, "pfx")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000148 self.assertTrue(os.path.basename(name)[:3] == "pfx")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000149 self.check_tempfile(name)
150
151 def test_tmpfile(self):
152 if not hasattr(os, "tmpfile"):
153 return
154 # As with test_tmpnam() below, the Windows implementation of tmpfile()
155 # attempts to create a file in the root directory of the current drive.
156 # On Vista and Server 2008, this test will always fail for normal users
157 # as writing to the root directory requires elevated privileges. With
158 # XP and below, the semantics of tmpfile() are the same, but the user
159 # running the test is more likely to have administrative privileges on
160 # their account already. If that's the case, then os.tmpfile() should
161 # work. In order to make this test as useful as possible, rather than
162 # trying to detect Windows versions or whether or not the user has the
163 # right permissions, just try and create a file in the root directory
164 # and see if it raises a 'Permission denied' OSError. If it does, then
165 # test that a subsequent call to os.tmpfile() raises the same error. If
166 # it doesn't, assume we're on XP or below and the user running the test
167 # has administrative privileges, and proceed with the test as normal.
168 if sys.platform == 'win32':
169 name = '\\python_test_os_test_tmpfile.txt'
170 if os.path.exists(name):
171 os.remove(name)
172 try:
173 fp = open(name, 'w')
174 except IOError as first:
175 # open() failed, assert tmpfile() fails in the same way.
176 # Although open() raises an IOError and os.tmpfile() raises an
177 # OSError(), 'args' will be (13, 'Permission denied') in both
178 # cases.
179 try:
180 fp = os.tmpfile()
181 except OSError as second:
182 self.assertEqual(first.args, second.args)
183 else:
184 self.fail("expected os.tmpfile() to raise OSError")
185 return
186 else:
187 # open() worked, therefore, tmpfile() should work. Close our
188 # dummy file and proceed with the test as normal.
189 fp.close()
190 os.remove(name)
191
192 fp = os.tmpfile()
193 fp.write("foobar")
194 fp.seek(0,0)
195 s = fp.read()
196 fp.close()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000197 self.assertTrue(s == "foobar")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000198
199 def test_tmpnam(self):
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000200 if not hasattr(os, "tmpnam"):
201 return
202 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
203 r"test_os$")
204 name = os.tmpnam()
205 if sys.platform in ("win32",):
206 # The Windows tmpnam() seems useless. From the MS docs:
207 #
208 # The character string that tmpnam creates consists of
209 # the path prefix, defined by the entry P_tmpdir in the
210 # file STDIO.H, followed by a sequence consisting of the
211 # digit characters '0' through '9'; the numerical value
212 # of this string is in the range 1 - 65,535. Changing the
213 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not
214 # change the operation of tmpnam.
215 #
216 # The really bizarre part is that, at least under MSVC6,
217 # P_tmpdir is "\\". That is, the path returned refers to
218 # the root of the current drive. That's a terrible place to
219 # put temp files, and, depending on privileges, the user
220 # may not even be able to open a file in the root directory.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000221 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000222 "file already exists for temporary file")
223 else:
224 self.check_tempfile(name)
225
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000226 def fdopen_helper(self, *args):
227 fd = os.open(support.TESTFN, os.O_RDONLY)
228 fp2 = os.fdopen(fd, *args)
229 fp2.close()
230
231 def test_fdopen(self):
232 self.fdopen_helper()
233 self.fdopen_helper('r')
234 self.fdopen_helper('r', 100)
235
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000236# Test attributes on return values from os.*stat* family.
237class StatAttributeTests(unittest.TestCase):
238 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000239 os.mkdir(support.TESTFN)
240 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000242 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000243 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000244
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000245 def tearDown(self):
246 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000247 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000248
Antoine Pitrou38425292010-09-21 18:19:07 +0000249 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000250 if not hasattr(os, "stat"):
251 return
252
253 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000254 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000255
256 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000257 self.assertEqual(result[stat.ST_SIZE], 3)
258 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000260 # Make sure all the attributes are there
261 members = dir(result)
262 for name in dir(stat):
263 if name[:3] == 'ST_':
264 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000265 if name.endswith("TIME"):
266 def trunc(x): return int(x)
267 else:
268 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000271 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272
273 try:
274 result[200]
275 self.fail("No exception thrown")
276 except IndexError:
277 pass
278
279 # Make sure that assignment fails
280 try:
281 result.st_mode = 1
282 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000283 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 pass
285
286 try:
287 result.st_rdev = 1
288 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000289 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000290 pass
291
292 try:
293 result.parrot = 1
294 self.fail("No exception thrown")
295 except AttributeError:
296 pass
297
298 # Use the stat_result constructor with a too-short tuple.
299 try:
300 result2 = os.stat_result((10,))
301 self.fail("No exception thrown")
302 except TypeError:
303 pass
304
Ezio Melotti42da6632011-03-15 05:18:48 +0200305 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000306 try:
307 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
308 except TypeError:
309 pass
310
Antoine Pitrou38425292010-09-21 18:19:07 +0000311 def test_stat_attributes(self):
312 self.check_stat_attributes(self.fname)
313
314 def test_stat_attributes_bytes(self):
315 try:
316 fname = self.fname.encode(sys.getfilesystemencoding())
317 except UnicodeEncodeError:
318 self.skipTest("cannot encode %a for the filesystem" % self.fname)
319 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000320
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000321 def test_statvfs_attributes(self):
322 if not hasattr(os, "statvfs"):
323 return
324
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000325 try:
326 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000327 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000328 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000329 if e.errno == errno.ENOSYS:
330 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000331
332 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000333 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000334
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000335 # Make sure all the attributes are there.
336 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
337 'ffree', 'favail', 'flag', 'namemax')
338 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000339 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000340
341 # Make sure that assignment really fails
342 try:
343 result.f_bfree = 1
344 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000345 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000346 pass
347
348 try:
349 result.parrot = 1
350 self.fail("No exception thrown")
351 except AttributeError:
352 pass
353
354 # Use the constructor with a too-short tuple.
355 try:
356 result2 = os.statvfs_result((10,))
357 self.fail("No exception thrown")
358 except TypeError:
359 pass
360
Ezio Melotti42da6632011-03-15 05:18:48 +0200361 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362 try:
363 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
364 except TypeError:
365 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000366
Thomas Wouters89f507f2006-12-13 04:49:30 +0000367 def test_utime_dir(self):
368 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000369 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000370 # round to int, because some systems may support sub-second
371 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000372 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
373 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000374 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000375
376 # Restrict test to Win32, since there is no guarantee other
377 # systems support centiseconds
378 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000379 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000380 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000381 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000382 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000383 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000384 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000385 return buf.value
386
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000387 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000388 def test_1565150(self):
389 t1 = 1159195039.25
390 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000391 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000392
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000393 def test_large_time(self):
394 t1 = 5000000000 # some day in 2128
395 os.utime(self.fname, (t1, t1))
396 self.assertEqual(os.stat(self.fname).st_mtime, t1)
397
Guido van Rossumd8faa362007-04-27 19:54:29 +0000398 def test_1686475(self):
399 # Verify that an open file can be stat'ed
400 try:
401 os.stat(r"c:\pagefile.sys")
402 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000403 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000404 return
405 self.fail("Could not stat pagefile.sys")
406
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000407from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000408
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000409class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000410 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000411 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000412
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000413 def setUp(self):
414 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000415 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000416 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000417 for key, value in self._reference().items():
418 os.environ[key] = value
419
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000420 def tearDown(self):
421 os.environ.clear()
422 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000423 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000424 os.environb.clear()
425 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000426
Christian Heimes90333392007-11-01 19:08:42 +0000427 def _reference(self):
428 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
429
430 def _empty_mapping(self):
431 os.environ.clear()
432 return os.environ
433
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000434 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000435 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000436 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000437 if os.path.exists("/bin/sh"):
438 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000439 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
440 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000441 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000442
Christian Heimes1a13d592007-11-08 14:16:55 +0000443 def test_os_popen_iter(self):
444 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000445 with os.popen(
446 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
447 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000448 self.assertEqual(next(it), "line1\n")
449 self.assertEqual(next(it), "line2\n")
450 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000451 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000452
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000453 # Verify environ keys and values from the OS are of the
454 # correct str type.
455 def test_keyvalue_types(self):
456 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000457 self.assertEqual(type(key), str)
458 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000459
Christian Heimes90333392007-11-01 19:08:42 +0000460 def test_items(self):
461 for key, value in self._reference().items():
462 self.assertEqual(os.environ.get(key), value)
463
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000464 # Issue 7310
465 def test___repr__(self):
466 """Check that the repr() of os.environ looks like environ({...})."""
467 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000468 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
469 '{!r}: {!r}'.format(key, value)
470 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000471
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000472 def test_get_exec_path(self):
473 defpath_list = os.defpath.split(os.pathsep)
474 test_path = ['/monty', '/python', '', '/flying/circus']
475 test_env = {'PATH': os.pathsep.join(test_path)}
476
477 saved_environ = os.environ
478 try:
479 os.environ = dict(test_env)
480 # Test that defaulting to os.environ works.
481 self.assertSequenceEqual(test_path, os.get_exec_path())
482 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
483 finally:
484 os.environ = saved_environ
485
486 # No PATH environment variable
487 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
488 # Empty PATH environment variable
489 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
490 # Supplied PATH environment variable
491 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
492
Victor Stinnerb745a742010-05-18 17:17:23 +0000493 if os.supports_bytes_environ:
494 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000495 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000496 # ignore BytesWarning warning
497 with warnings.catch_warnings(record=True):
498 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000499 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000500 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000501 pass
502 else:
503 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000504
505 # bytes key and/or value
506 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
507 ['abc'])
508 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
509 ['abc'])
510 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
511 ['abc'])
512
513 @unittest.skipUnless(os.supports_bytes_environ,
514 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000515 def test_environb(self):
516 # os.environ -> os.environb
517 value = 'euro\u20ac'
518 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000519 value_bytes = value.encode(sys.getfilesystemencoding(),
520 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000521 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000522 msg = "U+20AC character is not encodable to %s" % (
523 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000524 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000525 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000526 self.assertEqual(os.environ['unicode'], value)
527 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000528
529 # os.environb -> os.environ
530 value = b'\xff'
531 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000532 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000533 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000534 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000535
Tim Petersc4e09402003-04-25 07:11:48 +0000536class WalkTests(unittest.TestCase):
537 """Tests for os.walk()."""
538
539 def test_traversal(self):
540 import os
541 from os.path import join
542
543 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000544 # TESTFN/
545 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000546 # tmp1
547 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000548 # tmp2
549 # SUB11/ no kids
550 # SUB2/ a file kid and a dirsymlink kid
551 # tmp3
552 # link/ a symlink to TESTFN.2
553 # TEST2/
554 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000555 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000556 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000557 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000558 sub2_path = join(walk_path, "SUB2")
559 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000560 tmp2_path = join(sub1_path, "tmp2")
561 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000562 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000563 t2_path = join(support.TESTFN, "TEST2")
564 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000565
566 # Create stuff.
567 os.makedirs(sub11_path)
568 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000569 os.makedirs(t2_path)
570 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000571 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000572 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
573 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000574 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000575 os.symlink(os.path.abspath(t2_path), link_path)
576 sub2_tree = (sub2_path, ["link"], ["tmp3"])
577 else:
578 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000579
580 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000581 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000582 self.assertEqual(len(all), 4)
583 # We can't know which order SUB1 and SUB2 will appear in.
584 # Not flipped: TESTFN, SUB1, SUB11, SUB2
585 # flipped: TESTFN, SUB2, SUB1, SUB11
586 flipped = all[0][1][0] != "SUB1"
587 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000588 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000589 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
590 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000591 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000592
593 # Prune the search.
594 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000595 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000596 all.append((root, dirs, files))
597 # Don't descend into SUB1.
598 if 'SUB1' in dirs:
599 # Note that this also mutates the dirs we appended to all!
600 dirs.remove('SUB1')
601 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000602 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
603 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000604
605 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000606 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000607 self.assertEqual(len(all), 4)
608 # We can't know which order SUB1 and SUB2 will appear in.
609 # Not flipped: SUB11, SUB1, SUB2, TESTFN
610 # flipped: SUB2, SUB11, SUB1, TESTFN
611 flipped = all[3][1][0] != "SUB1"
612 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000613 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000614 self.assertEqual(all[flipped], (sub11_path, [], []))
615 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000616 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000617
Brian Curtin3b4499c2010-12-28 14:31:47 +0000618 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000619 # Walk, following symlinks.
620 for root, dirs, files in os.walk(walk_path, followlinks=True):
621 if root == link_path:
622 self.assertEqual(dirs, [])
623 self.assertEqual(files, ["tmp4"])
624 break
625 else:
626 self.fail("Didn't follow symlink with followlinks=True")
627
628 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000629 # Tear everything down. This is a decent use for bottom-up on
630 # Windows, which doesn't have a recursive delete command. The
631 # (not so) subtlety is that rmdir will fail unless the dir's
632 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000633 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000634 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000635 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000636 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000637 dirname = os.path.join(root, name)
638 if not os.path.islink(dirname):
639 os.rmdir(dirname)
640 else:
641 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000642 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000643
Guido van Rossume7ba4952007-06-06 23:52:48 +0000644class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000645 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000646 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000647
648 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000649 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000650 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
651 os.makedirs(path) # Should work
652 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
653 os.makedirs(path)
654
655 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000656 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000657 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
658 os.makedirs(path)
659 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
660 'dir5', 'dir6')
661 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000662
Terry Reedy5a22b652010-12-02 07:05:56 +0000663 def test_exist_ok_existing_directory(self):
664 path = os.path.join(support.TESTFN, 'dir1')
665 mode = 0o777
666 old_mask = os.umask(0o022)
667 os.makedirs(path, mode)
668 self.assertRaises(OSError, os.makedirs, path, mode)
669 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
670 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
671 os.makedirs(path, mode=mode, exist_ok=True)
672 os.umask(old_mask)
673
674 def test_exist_ok_existing_regular_file(self):
675 base = support.TESTFN
676 path = os.path.join(support.TESTFN, 'dir1')
677 f = open(path, 'w')
678 f.write('abc')
679 f.close()
680 self.assertRaises(OSError, os.makedirs, path)
681 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
682 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
683 os.remove(path)
684
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000685 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000686 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000687 'dir4', 'dir5', 'dir6')
688 # If the tests failed, the bottom-most directory ('../dir6')
689 # may not have been created, so we look for the outermost directory
690 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000691 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000692 path = os.path.dirname(path)
693
694 os.removedirs(path)
695
Guido van Rossume7ba4952007-06-06 23:52:48 +0000696class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000697 def test_devnull(self):
Alex Martelli01c77c62006-08-24 02:58:11 +0000698 f = open(os.devnull, 'w')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000699 f.write('hello')
700 f.close()
Alex Martelli01c77c62006-08-24 02:58:11 +0000701 f = open(os.devnull, 'r')
Tim Peters4182cfd2004-06-08 20:34:34 +0000702 self.assertEqual(f.read(), '')
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000703 f.close()
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000704
Guido van Rossume7ba4952007-06-06 23:52:48 +0000705class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000706 def test_urandom(self):
707 try:
708 self.assertEqual(len(os.urandom(1)), 1)
709 self.assertEqual(len(os.urandom(10)), 10)
710 self.assertEqual(len(os.urandom(100)), 100)
711 self.assertEqual(len(os.urandom(1000)), 1000)
712 except NotImplementedError:
713 pass
714
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000715@contextlib.contextmanager
716def _execvpe_mockup(defpath=None):
717 """
718 Stubs out execv and execve functions when used as context manager.
719 Records exec calls. The mock execv and execve functions always raise an
720 exception as they would normally never return.
721 """
722 # A list of tuples containing (function name, first arg, args)
723 # of calls to execv or execve that have been made.
724 calls = []
725
726 def mock_execv(name, *args):
727 calls.append(('execv', name, args))
728 raise RuntimeError("execv called")
729
730 def mock_execve(name, *args):
731 calls.append(('execve', name, args))
732 raise OSError(errno.ENOTDIR, "execve called")
733
734 try:
735 orig_execv = os.execv
736 orig_execve = os.execve
737 orig_defpath = os.defpath
738 os.execv = mock_execv
739 os.execve = mock_execve
740 if defpath is not None:
741 os.defpath = defpath
742 yield calls
743 finally:
744 os.execv = orig_execv
745 os.execve = orig_execve
746 os.defpath = orig_defpath
747
Guido van Rossume7ba4952007-06-06 23:52:48 +0000748class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000749 @unittest.skipIf(USING_LINUXTHREADS,
750 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000751 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000752 self.assertRaises(OSError, os.execvpe, 'no such app-',
753 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000754
Thomas Heller6790d602007-08-30 17:15:14 +0000755 def test_execvpe_with_bad_arglist(self):
756 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
757
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000758 @unittest.skipUnless(hasattr(os, '_execvpe'),
759 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000760 def _test_internal_execvpe(self, test_type):
761 program_path = os.sep + 'absolutepath'
762 if test_type is bytes:
763 program = b'executable'
764 fullpath = os.path.join(os.fsencode(program_path), program)
765 native_fullpath = fullpath
766 arguments = [b'progname', 'arg1', 'arg2']
767 else:
768 program = 'executable'
769 arguments = ['progname', 'arg1', 'arg2']
770 fullpath = os.path.join(program_path, program)
771 if os.name != "nt":
772 native_fullpath = os.fsencode(fullpath)
773 else:
774 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000775 env = {'spam': 'beans'}
776
Victor Stinnerb745a742010-05-18 17:17:23 +0000777 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000778 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000779 self.assertRaises(RuntimeError,
780 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000781 self.assertEqual(len(calls), 1)
782 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
783
Victor Stinnerb745a742010-05-18 17:17:23 +0000784 # test os._execvpe() with a relative path:
785 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000786 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000787 self.assertRaises(OSError,
788 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000789 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000790 self.assertSequenceEqual(calls[0],
791 ('execve', native_fullpath, (arguments, env)))
792
793 # test os._execvpe() with a relative path:
794 # os.get_exec_path() reads the 'PATH' variable
795 with _execvpe_mockup() as calls:
796 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000797 if test_type is bytes:
798 env_path[b'PATH'] = program_path
799 else:
800 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000801 self.assertRaises(OSError,
802 os._execvpe, program, arguments, env=env_path)
803 self.assertEqual(len(calls), 1)
804 self.assertSequenceEqual(calls[0],
805 ('execve', native_fullpath, (arguments, env_path)))
806
807 def test_internal_execvpe_str(self):
808 self._test_internal_execvpe(str)
809 if os.name != "nt":
810 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000811
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000812
Thomas Wouters477c8d52006-05-27 19:21:47 +0000813class Win32ErrorTests(unittest.TestCase):
814 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000815 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000816
817 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000818 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000819
820 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000821 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000822
823 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000824 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000825 try:
826 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
827 finally:
828 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000829 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000830
831 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000832 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000833
Thomas Wouters477c8d52006-05-27 19:21:47 +0000834 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000835 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000836
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000837class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000838 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000839 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
840 #singles.append("close")
841 #We omit close because it doesn'r raise an exception on some platforms
842 def get_single(f):
843 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000844 if hasattr(os, f):
845 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000846 return helper
847 for f in singles:
848 locals()["test_"+f] = get_single(f)
849
Benjamin Peterson7522c742009-01-19 21:00:09 +0000850 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000851 try:
852 f(support.make_bad_fd(), *args)
853 except OSError as e:
854 self.assertEqual(e.errno, errno.EBADF)
855 else:
856 self.fail("%r didn't raise a OSError with a bad file descriptor"
857 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000858
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000859 def test_isatty(self):
860 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000861 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000862
863 def test_closerange(self):
864 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000865 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000866 # Make sure none of the descriptors we are about to close are
867 # currently valid (issue 6542).
868 for i in range(10):
869 try: os.fstat(fd+i)
870 except OSError:
871 pass
872 else:
873 break
874 if i < 2:
875 raise unittest.SkipTest(
876 "Unable to acquire a range of invalid file descriptors")
877 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000878
879 def test_dup2(self):
880 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000881 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000882
883 def test_fchmod(self):
884 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000885 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000886
887 def test_fchown(self):
888 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000889 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000890
891 def test_fpathconf(self):
892 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000893 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000894
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000895 def test_ftruncate(self):
896 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000897 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000898
899 def test_lseek(self):
900 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000901 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000902
903 def test_read(self):
904 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000905 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000906
907 def test_tcsetpgrpt(self):
908 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000909 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000910
911 def test_write(self):
912 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000913 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000914
Brian Curtin1b9df392010-11-24 20:24:31 +0000915
916class LinkTests(unittest.TestCase):
917 def setUp(self):
918 self.file1 = support.TESTFN
919 self.file2 = os.path.join(support.TESTFN + "2")
920
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000921 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000922 for file in (self.file1, self.file2):
923 if os.path.exists(file):
924 os.unlink(file)
925
Brian Curtin1b9df392010-11-24 20:24:31 +0000926 def _test_link(self, file1, file2):
927 with open(file1, "w") as f1:
928 f1.write("test")
929
930 os.link(file1, file2)
931 with open(file1, "r") as f1, open(file2, "r") as f2:
932 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
933
934 def test_link(self):
935 self._test_link(self.file1, self.file2)
936
937 def test_link_bytes(self):
938 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
939 bytes(self.file2, sys.getfilesystemencoding()))
940
Brian Curtinf498b752010-11-30 15:54:04 +0000941 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000942 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000943 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000944 except UnicodeError:
945 raise unittest.SkipTest("Unable to encode for this platform.")
946
Brian Curtinf498b752010-11-30 15:54:04 +0000947 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000948 self.file2 = self.file1 + "2"
949 self._test_link(self.file1, self.file2)
950
Thomas Wouters477c8d52006-05-27 19:21:47 +0000951if sys.platform != 'win32':
952 class Win32ErrorTests(unittest.TestCase):
953 pass
954
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000955 class PosixUidGidTests(unittest.TestCase):
956 if hasattr(os, 'setuid'):
957 def test_setuid(self):
958 if os.getuid() != 0:
959 self.assertRaises(os.error, os.setuid, 0)
960 self.assertRaises(OverflowError, os.setuid, 1<<32)
961
962 if hasattr(os, 'setgid'):
963 def test_setgid(self):
964 if os.getuid() != 0:
965 self.assertRaises(os.error, os.setgid, 0)
966 self.assertRaises(OverflowError, os.setgid, 1<<32)
967
968 if hasattr(os, 'seteuid'):
969 def test_seteuid(self):
970 if os.getuid() != 0:
971 self.assertRaises(os.error, os.seteuid, 0)
972 self.assertRaises(OverflowError, os.seteuid, 1<<32)
973
974 if hasattr(os, 'setegid'):
975 def test_setegid(self):
976 if os.getuid() != 0:
977 self.assertRaises(os.error, os.setegid, 0)
978 self.assertRaises(OverflowError, os.setegid, 1<<32)
979
980 if hasattr(os, 'setreuid'):
981 def test_setreuid(self):
982 if os.getuid() != 0:
983 self.assertRaises(os.error, os.setreuid, 0, 0)
984 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
985 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000986
987 def test_setreuid_neg1(self):
988 # Needs to accept -1. We run this in a subprocess to avoid
989 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000990 subprocess.check_call([
991 sys.executable, '-c',
992 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000993
994 if hasattr(os, 'setregid'):
995 def test_setregid(self):
996 if os.getuid() != 0:
997 self.assertRaises(os.error, os.setregid, 0, 0)
998 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
999 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001000
1001 def test_setregid_neg1(self):
1002 # Needs to accept -1. We run this in a subprocess to avoid
1003 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001004 subprocess.check_call([
1005 sys.executable, '-c',
1006 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001007
1008 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001009 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001010 if support.TESTFN_UNENCODABLE:
1011 self.dir = support.TESTFN_UNENCODABLE
1012 else:
1013 self.dir = support.TESTFN
1014 self.bdir = os.fsencode(self.dir)
1015
1016 bytesfn = []
1017 def add_filename(fn):
1018 try:
1019 fn = os.fsencode(fn)
1020 except UnicodeEncodeError:
1021 return
1022 bytesfn.append(fn)
1023 add_filename(support.TESTFN_UNICODE)
1024 if support.TESTFN_UNENCODABLE:
1025 add_filename(support.TESTFN_UNENCODABLE)
1026 if not bytesfn:
1027 self.skipTest("couldn't create any non-ascii filename")
1028
1029 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001030 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001031 try:
1032 for fn in bytesfn:
1033 f = open(os.path.join(self.bdir, fn), "w")
1034 f.close()
Victor Stinnere8d51452010-08-19 01:05:19 +00001035 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001036 if fn in self.unicodefn:
1037 raise ValueError("duplicate filename")
1038 self.unicodefn.add(fn)
1039 except:
1040 shutil.rmtree(self.dir)
1041 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001042
1043 def tearDown(self):
1044 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001045
1046 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001047 expected = self.unicodefn
1048 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001049 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001050
1051 def test_open(self):
1052 for fn in self.unicodefn:
1053 f = open(os.path.join(self.dir, fn))
1054 f.close()
1055
1056 def test_stat(self):
1057 for fn in self.unicodefn:
1058 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001059else:
1060 class PosixUidGidTests(unittest.TestCase):
1061 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001062 class Pep383Tests(unittest.TestCase):
1063 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001064
Brian Curtineb24d742010-04-12 17:16:38 +00001065@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1066class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001067 def _kill(self, sig):
1068 # Start sys.executable as a subprocess and communicate from the
1069 # subprocess to the parent that the interpreter is ready. When it
1070 # becomes ready, send *sig* via os.kill to the subprocess and check
1071 # that the return code is equal to *sig*.
1072 import ctypes
1073 from ctypes import wintypes
1074 import msvcrt
1075
1076 # Since we can't access the contents of the process' stdout until the
1077 # process has exited, use PeekNamedPipe to see what's inside stdout
1078 # without waiting. This is done so we can tell that the interpreter
1079 # is started and running at a point where it could handle a signal.
1080 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1081 PeekNamedPipe.restype = wintypes.BOOL
1082 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1083 ctypes.POINTER(ctypes.c_char), # stdout buf
1084 wintypes.DWORD, # Buffer size
1085 ctypes.POINTER(wintypes.DWORD), # bytes read
1086 ctypes.POINTER(wintypes.DWORD), # bytes avail
1087 ctypes.POINTER(wintypes.DWORD)) # bytes left
1088 msg = "running"
1089 proc = subprocess.Popen([sys.executable, "-c",
1090 "import sys;"
1091 "sys.stdout.write('{}');"
1092 "sys.stdout.flush();"
1093 "input()".format(msg)],
1094 stdout=subprocess.PIPE,
1095 stderr=subprocess.PIPE,
1096 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001097 self.addCleanup(proc.stdout.close)
1098 self.addCleanup(proc.stderr.close)
1099 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001100
1101 count, max = 0, 100
1102 while count < max and proc.poll() is None:
1103 # Create a string buffer to store the result of stdout from the pipe
1104 buf = ctypes.create_string_buffer(len(msg))
1105 # Obtain the text currently in proc.stdout
1106 # Bytes read/avail/left are left as NULL and unused
1107 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1108 buf, ctypes.sizeof(buf), None, None, None)
1109 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1110 if buf.value:
1111 self.assertEqual(msg, buf.value.decode())
1112 break
1113 time.sleep(0.1)
1114 count += 1
1115 else:
1116 self.fail("Did not receive communication from the subprocess")
1117
Brian Curtineb24d742010-04-12 17:16:38 +00001118 os.kill(proc.pid, sig)
1119 self.assertEqual(proc.wait(), sig)
1120
1121 def test_kill_sigterm(self):
1122 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001123 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001124
1125 def test_kill_int(self):
1126 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001127 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001128
1129 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001130 tagname = "test_os_%s" % uuid.uuid1()
1131 m = mmap.mmap(-1, 1, tagname)
1132 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001133 # Run a script which has console control handling enabled.
1134 proc = subprocess.Popen([sys.executable,
1135 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001136 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001137 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1138 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001139 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001140 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001141 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001142 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001143 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001144 count += 1
1145 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001146 # Forcefully kill the process if we weren't able to signal it.
1147 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001148 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001149 os.kill(proc.pid, event)
1150 # proc.send_signal(event) could also be done here.
1151 # Allow time for the signal to be passed and the process to exit.
1152 time.sleep(0.5)
1153 if not proc.poll():
1154 # Forcefully kill the process if we weren't able to signal it.
1155 os.kill(proc.pid, signal.SIGINT)
1156 self.fail("subprocess did not stop on {}".format(name))
1157
1158 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1159 def test_CTRL_C_EVENT(self):
1160 from ctypes import wintypes
1161 import ctypes
1162
1163 # Make a NULL value by creating a pointer with no argument.
1164 NULL = ctypes.POINTER(ctypes.c_int)()
1165 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1166 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1167 wintypes.BOOL)
1168 SetConsoleCtrlHandler.restype = wintypes.BOOL
1169
1170 # Calling this with NULL and FALSE causes the calling process to
1171 # handle CTRL+C, rather than ignore it. This property is inherited
1172 # by subprocesses.
1173 SetConsoleCtrlHandler(NULL, 0)
1174
1175 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1176
1177 def test_CTRL_BREAK_EVENT(self):
1178 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1179
1180
Brian Curtind40e6f72010-07-08 21:39:08 +00001181@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001182@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001183class Win32SymlinkTests(unittest.TestCase):
1184 filelink = 'filelinktest'
1185 filelink_target = os.path.abspath(__file__)
1186 dirlink = 'dirlinktest'
1187 dirlink_target = os.path.dirname(filelink_target)
1188 missing_link = 'missing link'
1189
1190 def setUp(self):
1191 assert os.path.exists(self.dirlink_target)
1192 assert os.path.exists(self.filelink_target)
1193 assert not os.path.exists(self.dirlink)
1194 assert not os.path.exists(self.filelink)
1195 assert not os.path.exists(self.missing_link)
1196
1197 def tearDown(self):
1198 if os.path.exists(self.filelink):
1199 os.remove(self.filelink)
1200 if os.path.exists(self.dirlink):
1201 os.rmdir(self.dirlink)
1202 if os.path.lexists(self.missing_link):
1203 os.remove(self.missing_link)
1204
1205 def test_directory_link(self):
1206 os.symlink(self.dirlink_target, self.dirlink)
1207 self.assertTrue(os.path.exists(self.dirlink))
1208 self.assertTrue(os.path.isdir(self.dirlink))
1209 self.assertTrue(os.path.islink(self.dirlink))
1210 self.check_stat(self.dirlink, self.dirlink_target)
1211
1212 def test_file_link(self):
1213 os.symlink(self.filelink_target, self.filelink)
1214 self.assertTrue(os.path.exists(self.filelink))
1215 self.assertTrue(os.path.isfile(self.filelink))
1216 self.assertTrue(os.path.islink(self.filelink))
1217 self.check_stat(self.filelink, self.filelink_target)
1218
1219 def _create_missing_dir_link(self):
1220 'Create a "directory" link to a non-existent target'
1221 linkname = self.missing_link
1222 if os.path.lexists(linkname):
1223 os.remove(linkname)
1224 target = r'c:\\target does not exist.29r3c740'
1225 assert not os.path.exists(target)
1226 target_is_dir = True
1227 os.symlink(target, linkname, target_is_dir)
1228
1229 def test_remove_directory_link_to_missing_target(self):
1230 self._create_missing_dir_link()
1231 # For compatibility with Unix, os.remove will check the
1232 # directory status and call RemoveDirectory if the symlink
1233 # was created with target_is_dir==True.
1234 os.remove(self.missing_link)
1235
1236 @unittest.skip("currently fails; consider for improvement")
1237 def test_isdir_on_directory_link_to_missing_target(self):
1238 self._create_missing_dir_link()
1239 # consider having isdir return true for directory links
1240 self.assertTrue(os.path.isdir(self.missing_link))
1241
1242 @unittest.skip("currently fails; consider for improvement")
1243 def test_rmdir_on_directory_link_to_missing_target(self):
1244 self._create_missing_dir_link()
1245 # consider allowing rmdir to remove directory links
1246 os.rmdir(self.missing_link)
1247
1248 def check_stat(self, link, target):
1249 self.assertEqual(os.stat(link), os.stat(target))
1250 self.assertNotEqual(os.lstat(link), os.stat(link))
1251
Brian Curtind25aef52011-06-13 15:16:04 -05001252 bytes_link = os.fsencode(link)
1253 self.assertEqual(os.stat(bytes_link), os.stat(target))
1254 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
1255
1256 def test_12084(self):
1257 level1 = os.path.abspath(support.TESTFN)
1258 level2 = os.path.join(level1, "level2")
1259 level3 = os.path.join(level2, "level3")
1260 try:
1261 os.mkdir(level1)
1262 os.mkdir(level2)
1263 os.mkdir(level3)
1264
1265 file1 = os.path.abspath(os.path.join(level1, "file1"))
1266
1267 with open(file1, "w") as f:
1268 f.write("file1")
1269
1270 orig_dir = os.getcwd()
1271 try:
1272 os.chdir(level2)
1273 link = os.path.join(level2, "link")
1274 os.symlink(os.path.relpath(file1), "link")
1275 self.assertIn("link", os.listdir(os.getcwd()))
1276
1277 # Check os.stat calls from the same dir as the link
1278 self.assertEqual(os.stat(file1), os.stat("link"))
1279
1280 # Check os.stat calls from a dir below the link
1281 os.chdir(level1)
1282 self.assertEqual(os.stat(file1),
1283 os.stat(os.path.relpath(link)))
1284
1285 # Check os.stat calls from a dir above the link
1286 os.chdir(level3)
1287 self.assertEqual(os.stat(file1),
1288 os.stat(os.path.relpath(link)))
1289 finally:
1290 os.chdir(orig_dir)
1291 except OSError as err:
1292 self.fail(err)
1293 finally:
1294 os.remove(file1)
1295 shutil.rmtree(level1)
1296
Brian Curtind40e6f72010-07-08 21:39:08 +00001297
Victor Stinnere8d51452010-08-19 01:05:19 +00001298class FSEncodingTests(unittest.TestCase):
1299 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001300 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1301 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001302
Victor Stinnere8d51452010-08-19 01:05:19 +00001303 def test_identity(self):
1304 # assert fsdecode(fsencode(x)) == x
1305 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1306 try:
1307 bytesfn = os.fsencode(fn)
1308 except UnicodeEncodeError:
1309 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001310 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001311
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001312
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001313class PidTests(unittest.TestCase):
1314 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1315 def test_getppid(self):
1316 p = subprocess.Popen([sys.executable, '-c',
1317 'import os; print(os.getppid())'],
1318 stdout=subprocess.PIPE)
1319 stdout, _ = p.communicate()
1320 # We are the parent of our subprocess
1321 self.assertEqual(int(stdout), os.getpid())
1322
1323
Brian Curtin0151b8e2010-09-24 13:43:43 +00001324# The introduction of this TestCase caused at least two different errors on
1325# *nix buildbots. Temporarily skip this to let the buildbots move along.
1326@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001327@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1328class LoginTests(unittest.TestCase):
1329 def test_getlogin(self):
1330 user_name = os.getlogin()
1331 self.assertNotEqual(len(user_name), 0)
1332
1333
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001334@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1335 "needs os.getpriority and os.setpriority")
1336class ProgramPriorityTests(unittest.TestCase):
1337 """Tests for os.getpriority() and os.setpriority()."""
1338
1339 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001340
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001341 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1342 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1343 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001344 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1345 if base >= 19 and new_prio <= 19:
1346 raise unittest.SkipTest(
1347 "unable to reliably test setpriority at current nice level of %s" % base)
1348 else:
1349 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001350 finally:
1351 try:
1352 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1353 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001354 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001355 raise
1356
1357
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001358if threading is not None:
1359 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001360
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001361 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001362
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001363 def __init__(self, conn):
1364 asynchat.async_chat.__init__(self, conn)
1365 self.in_buffer = []
1366 self.closed = False
1367 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001368
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001369 def handle_read(self):
1370 data = self.recv(4096)
1371 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001372
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001373 def get_data(self):
1374 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001375
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001376 def handle_close(self):
1377 self.close()
1378 self.closed = True
1379
1380 def handle_error(self):
1381 raise
1382
1383 def __init__(self, address):
1384 threading.Thread.__init__(self)
1385 asyncore.dispatcher.__init__(self)
1386 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1387 self.bind(address)
1388 self.listen(5)
1389 self.host, self.port = self.socket.getsockname()[:2]
1390 self.handler_instance = None
1391 self._active = False
1392 self._active_lock = threading.Lock()
1393
1394 # --- public API
1395
1396 @property
1397 def running(self):
1398 return self._active
1399
1400 def start(self):
1401 assert not self.running
1402 self.__flag = threading.Event()
1403 threading.Thread.start(self)
1404 self.__flag.wait()
1405
1406 def stop(self):
1407 assert self.running
1408 self._active = False
1409 self.join()
1410
1411 def wait(self):
1412 # wait for handler connection to be closed, then stop the server
1413 while not getattr(self.handler_instance, "closed", False):
1414 time.sleep(0.001)
1415 self.stop()
1416
1417 # --- internals
1418
1419 def run(self):
1420 self._active = True
1421 self.__flag.set()
1422 while self._active and asyncore.socket_map:
1423 self._active_lock.acquire()
1424 asyncore.loop(timeout=0.001, count=1)
1425 self._active_lock.release()
1426 asyncore.close_all()
1427
1428 def handle_accept(self):
1429 conn, addr = self.accept()
1430 self.handler_instance = self.Handler(conn)
1431
1432 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001433 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001434 handle_read = handle_connect
1435
1436 def writable(self):
1437 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001438
1439 def handle_error(self):
1440 raise
1441
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001442
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001443@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001444@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1445class TestSendfile(unittest.TestCase):
1446
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001447 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001448 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001449 not sys.platform.startswith("solaris") and \
1450 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001451
1452 @classmethod
1453 def setUpClass(cls):
1454 with open(support.TESTFN, "wb") as f:
1455 f.write(cls.DATA)
1456
1457 @classmethod
1458 def tearDownClass(cls):
1459 support.unlink(support.TESTFN)
1460
1461 def setUp(self):
1462 self.server = SendfileTestServer((support.HOST, 0))
1463 self.server.start()
1464 self.client = socket.socket()
1465 self.client.connect((self.server.host, self.server.port))
1466 self.client.settimeout(1)
1467 # synchronize by waiting for "220 ready" response
1468 self.client.recv(1024)
1469 self.sockno = self.client.fileno()
1470 self.file = open(support.TESTFN, 'rb')
1471 self.fileno = self.file.fileno()
1472
1473 def tearDown(self):
1474 self.file.close()
1475 self.client.close()
1476 if self.server.running:
1477 self.server.stop()
1478
1479 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1480 """A higher level wrapper representing how an application is
1481 supposed to use sendfile().
1482 """
1483 while 1:
1484 try:
1485 if self.SUPPORT_HEADERS_TRAILERS:
1486 return os.sendfile(sock, file, offset, nbytes, headers,
1487 trailers)
1488 else:
1489 return os.sendfile(sock, file, offset, nbytes)
1490 except OSError as err:
1491 if err.errno == errno.ECONNRESET:
1492 # disconnected
1493 raise
1494 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1495 # we have to retry send data
1496 continue
1497 else:
1498 raise
1499
1500 def test_send_whole_file(self):
1501 # normal send
1502 total_sent = 0
1503 offset = 0
1504 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001505 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001506 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1507 if sent == 0:
1508 break
1509 offset += sent
1510 total_sent += sent
1511 self.assertTrue(sent <= nbytes)
1512 self.assertEqual(offset, total_sent)
1513
1514 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001515 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001516 self.client.close()
1517 self.server.wait()
1518 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001519 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001520 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001521
1522 def test_send_at_certain_offset(self):
1523 # start sending a file at a certain offset
1524 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001525 offset = len(self.DATA) // 2
1526 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001527 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001528 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001529 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1530 if sent == 0:
1531 break
1532 offset += sent
1533 total_sent += sent
1534 self.assertTrue(sent <= nbytes)
1535
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001536 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001537 self.client.close()
1538 self.server.wait()
1539 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001540 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001541 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001542 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001543 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001544
1545 def test_offset_overflow(self):
1546 # specify an offset > file size
1547 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001548 try:
1549 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1550 except OSError as e:
1551 # Solaris can raise EINVAL if offset >= file length, ignore.
1552 if e.errno != errno.EINVAL:
1553 raise
1554 else:
1555 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001556 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001557 self.client.close()
1558 self.server.wait()
1559 data = self.server.handler_instance.get_data()
1560 self.assertEqual(data, b'')
1561
1562 def test_invalid_offset(self):
1563 with self.assertRaises(OSError) as cm:
1564 os.sendfile(self.sockno, self.fileno, -1, 4096)
1565 self.assertEqual(cm.exception.errno, errno.EINVAL)
1566
1567 # --- headers / trailers tests
1568
1569 if SUPPORT_HEADERS_TRAILERS:
1570
1571 def test_headers(self):
1572 total_sent = 0
1573 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1574 headers=[b"x" * 512])
1575 total_sent += sent
1576 offset = 4096
1577 nbytes = 4096
1578 while 1:
1579 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1580 offset, nbytes)
1581 if sent == 0:
1582 break
1583 total_sent += sent
1584 offset += sent
1585
1586 expected_data = b"x" * 512 + self.DATA
1587 self.assertEqual(total_sent, len(expected_data))
1588 self.client.close()
1589 self.server.wait()
1590 data = self.server.handler_instance.get_data()
1591 self.assertEqual(hash(data), hash(expected_data))
1592
1593 def test_trailers(self):
1594 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001595 with open(TESTFN2, 'wb') as f:
1596 f.write(b"abcde")
1597 with open(TESTFN2, 'rb')as f:
1598 self.addCleanup(os.remove, TESTFN2)
1599 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1600 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001601 self.client.close()
1602 self.server.wait()
1603 data = self.server.handler_instance.get_data()
1604 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001605
1606 if hasattr(os, "SF_NODISKIO"):
1607 def test_flags(self):
1608 try:
1609 os.sendfile(self.sockno, self.fileno, 0, 4096,
1610 flags=os.SF_NODISKIO)
1611 except OSError as err:
1612 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1613 raise
1614
1615
Fred Drake2e2be372001-09-20 21:33:42 +00001616def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001617 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001618 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001619 StatAttributeTests,
1620 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001621 WalkTests,
1622 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001623 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001624 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001625 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001626 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001627 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001628 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001629 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001630 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001631 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001632 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001633 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001634 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001635 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001636 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001637 ProgramPriorityTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001638 )
Fred Drake2e2be372001-09-20 21:33:42 +00001639
1640if __name__ == "__main__":
1641 test_main()