blob: 5f39f64340de1f6c30828f36626e4c4f533099b2 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
17import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000018import asyncore
19import asynchat
20import socket
21try:
22 import threading
23except ImportError:
24 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000025
Mark Dickinson7cf03892010-04-16 13:45:35 +000026# Detect whether we're on a Linux system that uses the (now outdated
27# and unmaintained) linuxthreads threading library. There's an issue
28# when combining linuxthreads with a failed execv call: see
29# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020030if hasattr(sys, 'thread_info') and sys.thread_info.version:
31 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
32else:
33 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000034
Thomas Wouters0e3f5912006-08-11 14:57:12 +000035# Tests creating TESTFN
36class FileTests(unittest.TestCase):
37 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038 if os.path.exists(support.TESTFN):
39 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000040 tearDown = setUp
41
42 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000043 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000044 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000045 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046
Christian Heimesfdab48e2008-01-20 09:06:41 +000047 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000048 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
49 # We must allocate two consecutive file descriptors, otherwise
50 # it will mess up other file descriptors (perhaps even the three
51 # standard ones).
52 second = os.dup(first)
53 try:
54 retries = 0
55 while second != first + 1:
56 os.close(first)
57 retries += 1
58 if retries > 10:
59 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000060 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000061 first, second = second, os.dup(second)
62 finally:
63 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000064 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000065 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000066 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000068 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000069 def test_rename(self):
70 path = support.TESTFN
71 old = sys.getrefcount(path)
72 self.assertRaises(TypeError, os.rename, path, 0)
73 new = sys.getrefcount(path)
74 self.assertEqual(old, new)
75
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000076 def test_read(self):
77 with open(support.TESTFN, "w+b") as fobj:
78 fobj.write(b"spam")
79 fobj.flush()
80 fd = fobj.fileno()
81 os.lseek(fd, 0, 0)
82 s = os.read(fd, 4)
83 self.assertEqual(type(s), bytes)
84 self.assertEqual(s, b"spam")
85
86 def test_write(self):
87 # os.write() accepts bytes- and buffer-like objects but not strings
88 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
89 self.assertRaises(TypeError, os.write, fd, "beans")
90 os.write(fd, b"bacon\n")
91 os.write(fd, bytearray(b"eggs\n"))
92 os.write(fd, memoryview(b"spam\n"))
93 os.close(fd)
94 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000095 self.assertEqual(fobj.read().splitlines(),
96 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000097
Victor Stinnere0daff12011-03-20 23:36:35 +010098 def write_windows_console(self, *args):
99 retcode = subprocess.call(args,
100 # use a new console to not flood the test output
101 creationflags=subprocess.CREATE_NEW_CONSOLE,
102 # use a shell to hide the console window (SW_HIDE)
103 shell=True)
104 self.assertEqual(retcode, 0)
105
106 @unittest.skipUnless(sys.platform == 'win32',
107 'test specific to the Windows console')
108 def test_write_windows_console(self):
109 # Issue #11395: the Windows console returns an error (12: not enough
110 # space error) on writing into stdout if stdout mode is binary and the
111 # length is greater than 66,000 bytes (or less, depending on heap
112 # usage).
113 code = "print('x' * 100000)"
114 self.write_windows_console(sys.executable, "-c", code)
115 self.write_windows_console(sys.executable, "-u", "-c", code)
116
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000117
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000118class TemporaryFileTests(unittest.TestCase):
119 def setUp(self):
120 self.files = []
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000121 os.mkdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000122
123 def tearDown(self):
124 for name in self.files:
125 os.unlink(name)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000126 os.rmdir(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000127
128 def check_tempfile(self, name):
129 # make sure it doesn't already exist:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000130 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000131 "file already exists for temporary file")
132 # make sure we can create the file
133 open(name, "w")
134 self.files.append(name)
135
136 def test_tempnam(self):
137 if not hasattr(os, "tempnam"):
138 return
139 warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
140 r"test_os$")
141 self.check_tempfile(os.tempnam())
142
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000143 name = os.tempnam(support.TESTFN)
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000144 self.check_tempfile(name)
145
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000146 name = os.tempnam(support.TESTFN, "pfx")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000147 self.assertTrue(os.path.basename(name)[:3] == "pfx")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000148 self.check_tempfile(name)
149
150 def test_tmpfile(self):
151 if not hasattr(os, "tmpfile"):
152 return
153 # As with test_tmpnam() below, the Windows implementation of tmpfile()
154 # attempts to create a file in the root directory of the current drive.
155 # On Vista and Server 2008, this test will always fail for normal users
156 # as writing to the root directory requires elevated privileges. With
157 # XP and below, the semantics of tmpfile() are the same, but the user
158 # running the test is more likely to have administrative privileges on
159 # their account already. If that's the case, then os.tmpfile() should
160 # work. In order to make this test as useful as possible, rather than
161 # trying to detect Windows versions or whether or not the user has the
162 # right permissions, just try and create a file in the root directory
163 # and see if it raises a 'Permission denied' OSError. If it does, then
164 # test that a subsequent call to os.tmpfile() raises the same error. If
165 # it doesn't, assume we're on XP or below and the user running the test
166 # has administrative privileges, and proceed with the test as normal.
167 if sys.platform == 'win32':
168 name = '\\python_test_os_test_tmpfile.txt'
169 if os.path.exists(name):
170 os.remove(name)
171 try:
172 fp = open(name, 'w')
173 except IOError as first:
174 # open() failed, assert tmpfile() fails in the same way.
175 # Although open() raises an IOError and os.tmpfile() raises an
176 # OSError(), 'args' will be (13, 'Permission denied') in both
177 # cases.
178 try:
179 fp = os.tmpfile()
180 except OSError as second:
181 self.assertEqual(first.args, second.args)
182 else:
183 self.fail("expected os.tmpfile() to raise OSError")
184 return
185 else:
186 # open() worked, therefore, tmpfile() should work. Close our
187 # dummy file and proceed with the test as normal.
188 fp.close()
189 os.remove(name)
190
191 fp = os.tmpfile()
192 fp.write("foobar")
193 fp.seek(0,0)
194 s = fp.read()
195 fp.close()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000196 self.assertTrue(s == "foobar")
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000197
198 def test_tmpnam(self):
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000199 if not hasattr(os, "tmpnam"):
200 return
201 warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
202 r"test_os$")
203 name = os.tmpnam()
204 if sys.platform in ("win32",):
205 # The Windows tmpnam() seems useless. From the MS docs:
206 #
207 # The character string that tmpnam creates consists of
208 # the path prefix, defined by the entry P_tmpdir in the
209 # file STDIO.H, followed by a sequence consisting of the
210 # digit characters '0' through '9'; the numerical value
211 # of this string is in the range 1 - 65,535. Changing the
212 # definitions of L_tmpnam or P_tmpdir in STDIO.H does not
213 # change the operation of tmpnam.
214 #
215 # The really bizarre part is that, at least under MSVC6,
216 # P_tmpdir is "\\". That is, the path returned refers to
217 # the root of the current drive. That's a terrible place to
218 # put temp files, and, depending on privileges, the user
219 # may not even be able to open a file in the root directory.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000220 self.assertFalse(os.path.exists(name),
Christian Heimesdd15f6c2008-03-16 00:07:10 +0000221 "file already exists for temporary file")
222 else:
223 self.check_tempfile(name)
224
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000225 def fdopen_helper(self, *args):
226 fd = os.open(support.TESTFN, os.O_RDONLY)
227 fp2 = os.fdopen(fd, *args)
228 fp2.close()
229
230 def test_fdopen(self):
231 self.fdopen_helper()
232 self.fdopen_helper('r')
233 self.fdopen_helper('r', 100)
234
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235# Test attributes on return values from os.*stat* family.
236class StatAttributeTests(unittest.TestCase):
237 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000238 os.mkdir(support.TESTFN)
239 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000240 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000241 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000243
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000244 def tearDown(self):
245 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000246 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247
Antoine Pitrou38425292010-09-21 18:19:07 +0000248 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 if not hasattr(os, "stat"):
250 return
251
252 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000253 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254
255 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(result[stat.ST_SIZE], 3)
257 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259 # Make sure all the attributes are there
260 members = dir(result)
261 for name in dir(stat):
262 if name[:3] == 'ST_':
263 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000264 if name.endswith("TIME"):
265 def trunc(x): return int(x)
266 else:
267 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000268 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000270 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271
272 try:
273 result[200]
274 self.fail("No exception thrown")
275 except IndexError:
276 pass
277
278 # Make sure that assignment fails
279 try:
280 result.st_mode = 1
281 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000282 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000283 pass
284
285 try:
286 result.st_rdev = 1
287 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000288 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000289 pass
290
291 try:
292 result.parrot = 1
293 self.fail("No exception thrown")
294 except AttributeError:
295 pass
296
297 # Use the stat_result constructor with a too-short tuple.
298 try:
299 result2 = os.stat_result((10,))
300 self.fail("No exception thrown")
301 except TypeError:
302 pass
303
Ezio Melotti42da6632011-03-15 05:18:48 +0200304 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000305 try:
306 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
307 except TypeError:
308 pass
309
Antoine Pitrou38425292010-09-21 18:19:07 +0000310 def test_stat_attributes(self):
311 self.check_stat_attributes(self.fname)
312
313 def test_stat_attributes_bytes(self):
314 try:
315 fname = self.fname.encode(sys.getfilesystemencoding())
316 except UnicodeEncodeError:
317 self.skipTest("cannot encode %a for the filesystem" % self.fname)
318 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000319
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000320 def test_statvfs_attributes(self):
321 if not hasattr(os, "statvfs"):
322 return
323
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000324 try:
325 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000326 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000327 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000328 if e.errno == errno.ENOSYS:
329 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000330
331 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000332 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000333
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000334 # Make sure all the attributes are there.
335 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
336 'ffree', 'favail', 'flag', 'namemax')
337 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000339
340 # Make sure that assignment really fails
341 try:
342 result.f_bfree = 1
343 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000344 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345 pass
346
347 try:
348 result.parrot = 1
349 self.fail("No exception thrown")
350 except AttributeError:
351 pass
352
353 # Use the constructor with a too-short tuple.
354 try:
355 result2 = os.statvfs_result((10,))
356 self.fail("No exception thrown")
357 except TypeError:
358 pass
359
Ezio Melotti42da6632011-03-15 05:18:48 +0200360 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361 try:
362 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
363 except TypeError:
364 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000365
Thomas Wouters89f507f2006-12-13 04:49:30 +0000366 def test_utime_dir(self):
367 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000368 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000369 # round to int, because some systems may support sub-second
370 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000371 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
372 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000373 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000374
375 # Restrict test to Win32, since there is no guarantee other
376 # systems support centiseconds
377 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000378 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000379 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000380 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000381 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000382 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000383 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000384 return buf.value
385
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000386 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000387 def test_1565150(self):
388 t1 = 1159195039.25
389 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000390 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000391
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000392 def test_large_time(self):
393 t1 = 5000000000 # some day in 2128
394 os.utime(self.fname, (t1, t1))
395 self.assertEqual(os.stat(self.fname).st_mtime, t1)
396
Guido van Rossumd8faa362007-04-27 19:54:29 +0000397 def test_1686475(self):
398 # Verify that an open file can be stat'ed
399 try:
400 os.stat(r"c:\pagefile.sys")
401 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000402 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000403 return
404 self.fail("Could not stat pagefile.sys")
405
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000406from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000407
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000408class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000409 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000410 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000411
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000412 def setUp(self):
413 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000414 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000415 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000416 for key, value in self._reference().items():
417 os.environ[key] = value
418
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000419 def tearDown(self):
420 os.environ.clear()
421 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000422 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000423 os.environb.clear()
424 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000425
Christian Heimes90333392007-11-01 19:08:42 +0000426 def _reference(self):
427 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
428
429 def _empty_mapping(self):
430 os.environ.clear()
431 return os.environ
432
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000433 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000434 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000435 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000436 if os.path.exists("/bin/sh"):
437 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000438 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
439 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000440 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000441
Christian Heimes1a13d592007-11-08 14:16:55 +0000442 def test_os_popen_iter(self):
443 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000444 with os.popen(
445 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
446 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000447 self.assertEqual(next(it), "line1\n")
448 self.assertEqual(next(it), "line2\n")
449 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000450 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000451
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000452 # Verify environ keys and values from the OS are of the
453 # correct str type.
454 def test_keyvalue_types(self):
455 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000456 self.assertEqual(type(key), str)
457 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000458
Christian Heimes90333392007-11-01 19:08:42 +0000459 def test_items(self):
460 for key, value in self._reference().items():
461 self.assertEqual(os.environ.get(key), value)
462
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000463 # Issue 7310
464 def test___repr__(self):
465 """Check that the repr() of os.environ looks like environ({...})."""
466 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000467 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
468 '{!r}: {!r}'.format(key, value)
469 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000470
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000471 def test_get_exec_path(self):
472 defpath_list = os.defpath.split(os.pathsep)
473 test_path = ['/monty', '/python', '', '/flying/circus']
474 test_env = {'PATH': os.pathsep.join(test_path)}
475
476 saved_environ = os.environ
477 try:
478 os.environ = dict(test_env)
479 # Test that defaulting to os.environ works.
480 self.assertSequenceEqual(test_path, os.get_exec_path())
481 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
482 finally:
483 os.environ = saved_environ
484
485 # No PATH environment variable
486 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
487 # Empty PATH environment variable
488 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
489 # Supplied PATH environment variable
490 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
491
Victor Stinnerb745a742010-05-18 17:17:23 +0000492 if os.supports_bytes_environ:
493 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000494 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000495 # ignore BytesWarning warning
496 with warnings.catch_warnings(record=True):
497 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000498 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000499 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000500 pass
501 else:
502 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000503
504 # bytes key and/or value
505 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
506 ['abc'])
507 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
508 ['abc'])
509 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
510 ['abc'])
511
512 @unittest.skipUnless(os.supports_bytes_environ,
513 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000514 def test_environb(self):
515 # os.environ -> os.environb
516 value = 'euro\u20ac'
517 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000518 value_bytes = value.encode(sys.getfilesystemencoding(),
519 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000520 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000521 msg = "U+20AC character is not encodable to %s" % (
522 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000523 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000524 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000525 self.assertEqual(os.environ['unicode'], value)
526 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000527
528 # os.environb -> os.environ
529 value = b'\xff'
530 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000531 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000532 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000533 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000534
Tim Petersc4e09402003-04-25 07:11:48 +0000535class WalkTests(unittest.TestCase):
536 """Tests for os.walk()."""
537
538 def test_traversal(self):
539 import os
540 from os.path import join
541
542 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000543 # TESTFN/
544 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000545 # tmp1
546 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000547 # tmp2
548 # SUB11/ no kids
549 # SUB2/ a file kid and a dirsymlink kid
550 # tmp3
551 # link/ a symlink to TESTFN.2
552 # TEST2/
553 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000554 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000555 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000556 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000557 sub2_path = join(walk_path, "SUB2")
558 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000559 tmp2_path = join(sub1_path, "tmp2")
560 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000561 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000562 t2_path = join(support.TESTFN, "TEST2")
563 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000564
565 # Create stuff.
566 os.makedirs(sub11_path)
567 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000568 os.makedirs(t2_path)
569 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000570 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000571 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
572 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000573 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000574 os.symlink(os.path.abspath(t2_path), link_path)
575 sub2_tree = (sub2_path, ["link"], ["tmp3"])
576 else:
577 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000578
579 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000580 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000581 self.assertEqual(len(all), 4)
582 # We can't know which order SUB1 and SUB2 will appear in.
583 # Not flipped: TESTFN, SUB1, SUB11, SUB2
584 # flipped: TESTFN, SUB2, SUB1, SUB11
585 flipped = all[0][1][0] != "SUB1"
586 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000587 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000588 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
589 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000590 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000591
592 # Prune the search.
593 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000594 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000595 all.append((root, dirs, files))
596 # Don't descend into SUB1.
597 if 'SUB1' in dirs:
598 # Note that this also mutates the dirs we appended to all!
599 dirs.remove('SUB1')
600 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000601 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
602 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000603
604 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000605 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000606 self.assertEqual(len(all), 4)
607 # We can't know which order SUB1 and SUB2 will appear in.
608 # Not flipped: SUB11, SUB1, SUB2, TESTFN
609 # flipped: SUB2, SUB11, SUB1, TESTFN
610 flipped = all[3][1][0] != "SUB1"
611 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000612 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000613 self.assertEqual(all[flipped], (sub11_path, [], []))
614 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000615 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000616
Brian Curtin3b4499c2010-12-28 14:31:47 +0000617 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000618 # Walk, following symlinks.
619 for root, dirs, files in os.walk(walk_path, followlinks=True):
620 if root == link_path:
621 self.assertEqual(dirs, [])
622 self.assertEqual(files, ["tmp4"])
623 break
624 else:
625 self.fail("Didn't follow symlink with followlinks=True")
626
627 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000628 # Tear everything down. This is a decent use for bottom-up on
629 # Windows, which doesn't have a recursive delete command. The
630 # (not so) subtlety is that rmdir will fail unless the dir's
631 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000632 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000633 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000634 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000635 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000636 dirname = os.path.join(root, name)
637 if not os.path.islink(dirname):
638 os.rmdir(dirname)
639 else:
640 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000641 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000642
Guido van Rossume7ba4952007-06-06 23:52:48 +0000643class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000644 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000645 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000646
647 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000648 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000649 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
650 os.makedirs(path) # Should work
651 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
652 os.makedirs(path)
653
654 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000655 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000656 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
657 os.makedirs(path)
658 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
659 'dir5', 'dir6')
660 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000661
Terry Reedy5a22b652010-12-02 07:05:56 +0000662 def test_exist_ok_existing_directory(self):
663 path = os.path.join(support.TESTFN, 'dir1')
664 mode = 0o777
665 old_mask = os.umask(0o022)
666 os.makedirs(path, mode)
667 self.assertRaises(OSError, os.makedirs, path, mode)
668 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
669 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
670 os.makedirs(path, mode=mode, exist_ok=True)
671 os.umask(old_mask)
672
673 def test_exist_ok_existing_regular_file(self):
674 base = support.TESTFN
675 path = os.path.join(support.TESTFN, 'dir1')
676 f = open(path, 'w')
677 f.write('abc')
678 f.close()
679 self.assertRaises(OSError, os.makedirs, path)
680 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
681 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
682 os.remove(path)
683
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000684 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000685 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000686 'dir4', 'dir5', 'dir6')
687 # If the tests failed, the bottom-most directory ('../dir6')
688 # may not have been created, so we look for the outermost directory
689 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000690 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000691 path = os.path.dirname(path)
692
693 os.removedirs(path)
694
Guido van Rossume7ba4952007-06-06 23:52:48 +0000695class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000696 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200697 with open(os.devnull, 'wb') as f:
698 f.write(b'hello')
699 f.close()
700 with open(os.devnull, 'rb') as f:
701 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000702
Guido van Rossume7ba4952007-06-06 23:52:48 +0000703class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000704 def test_urandom(self):
705 try:
706 self.assertEqual(len(os.urandom(1)), 1)
707 self.assertEqual(len(os.urandom(10)), 10)
708 self.assertEqual(len(os.urandom(100)), 100)
709 self.assertEqual(len(os.urandom(1000)), 1000)
710 except NotImplementedError:
711 pass
712
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000713@contextlib.contextmanager
714def _execvpe_mockup(defpath=None):
715 """
716 Stubs out execv and execve functions when used as context manager.
717 Records exec calls. The mock execv and execve functions always raise an
718 exception as they would normally never return.
719 """
720 # A list of tuples containing (function name, first arg, args)
721 # of calls to execv or execve that have been made.
722 calls = []
723
724 def mock_execv(name, *args):
725 calls.append(('execv', name, args))
726 raise RuntimeError("execv called")
727
728 def mock_execve(name, *args):
729 calls.append(('execve', name, args))
730 raise OSError(errno.ENOTDIR, "execve called")
731
732 try:
733 orig_execv = os.execv
734 orig_execve = os.execve
735 orig_defpath = os.defpath
736 os.execv = mock_execv
737 os.execve = mock_execve
738 if defpath is not None:
739 os.defpath = defpath
740 yield calls
741 finally:
742 os.execv = orig_execv
743 os.execve = orig_execve
744 os.defpath = orig_defpath
745
Guido van Rossume7ba4952007-06-06 23:52:48 +0000746class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000747 @unittest.skipIf(USING_LINUXTHREADS,
748 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000749 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000750 self.assertRaises(OSError, os.execvpe, 'no such app-',
751 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000752
Thomas Heller6790d602007-08-30 17:15:14 +0000753 def test_execvpe_with_bad_arglist(self):
754 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
755
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000756 @unittest.skipUnless(hasattr(os, '_execvpe'),
757 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000758 def _test_internal_execvpe(self, test_type):
759 program_path = os.sep + 'absolutepath'
760 if test_type is bytes:
761 program = b'executable'
762 fullpath = os.path.join(os.fsencode(program_path), program)
763 native_fullpath = fullpath
764 arguments = [b'progname', 'arg1', 'arg2']
765 else:
766 program = 'executable'
767 arguments = ['progname', 'arg1', 'arg2']
768 fullpath = os.path.join(program_path, program)
769 if os.name != "nt":
770 native_fullpath = os.fsencode(fullpath)
771 else:
772 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000773 env = {'spam': 'beans'}
774
Victor Stinnerb745a742010-05-18 17:17:23 +0000775 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000776 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000777 self.assertRaises(RuntimeError,
778 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000779 self.assertEqual(len(calls), 1)
780 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
781
Victor Stinnerb745a742010-05-18 17:17:23 +0000782 # test os._execvpe() with a relative path:
783 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000784 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000785 self.assertRaises(OSError,
786 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000787 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000788 self.assertSequenceEqual(calls[0],
789 ('execve', native_fullpath, (arguments, env)))
790
791 # test os._execvpe() with a relative path:
792 # os.get_exec_path() reads the 'PATH' variable
793 with _execvpe_mockup() as calls:
794 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000795 if test_type is bytes:
796 env_path[b'PATH'] = program_path
797 else:
798 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000799 self.assertRaises(OSError,
800 os._execvpe, program, arguments, env=env_path)
801 self.assertEqual(len(calls), 1)
802 self.assertSequenceEqual(calls[0],
803 ('execve', native_fullpath, (arguments, env_path)))
804
805 def test_internal_execvpe_str(self):
806 self._test_internal_execvpe(str)
807 if os.name != "nt":
808 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000809
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000810
Thomas Wouters477c8d52006-05-27 19:21:47 +0000811class Win32ErrorTests(unittest.TestCase):
812 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000813 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000814
815 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000816 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000817
818 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000819 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000820
821 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000822 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000823 try:
824 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
825 finally:
826 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000827 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000828
829 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000830 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000831
Thomas Wouters477c8d52006-05-27 19:21:47 +0000832 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000833 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000834
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000835class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000836 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000837 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
838 #singles.append("close")
839 #We omit close because it doesn'r raise an exception on some platforms
840 def get_single(f):
841 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000842 if hasattr(os, f):
843 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000844 return helper
845 for f in singles:
846 locals()["test_"+f] = get_single(f)
847
Benjamin Peterson7522c742009-01-19 21:00:09 +0000848 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000849 try:
850 f(support.make_bad_fd(), *args)
851 except OSError as e:
852 self.assertEqual(e.errno, errno.EBADF)
853 else:
854 self.fail("%r didn't raise a OSError with a bad file descriptor"
855 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000856
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000857 def test_isatty(self):
858 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000859 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000860
861 def test_closerange(self):
862 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000863 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000864 # Make sure none of the descriptors we are about to close are
865 # currently valid (issue 6542).
866 for i in range(10):
867 try: os.fstat(fd+i)
868 except OSError:
869 pass
870 else:
871 break
872 if i < 2:
873 raise unittest.SkipTest(
874 "Unable to acquire a range of invalid file descriptors")
875 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000876
877 def test_dup2(self):
878 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000879 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000880
881 def test_fchmod(self):
882 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000883 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000884
885 def test_fchown(self):
886 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000887 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000888
889 def test_fpathconf(self):
890 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000891 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000892
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000893 def test_ftruncate(self):
894 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000895 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000896
897 def test_lseek(self):
898 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000899 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000900
901 def test_read(self):
902 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000903 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000904
905 def test_tcsetpgrpt(self):
906 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000907 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000908
909 def test_write(self):
910 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000911 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000912
Brian Curtin1b9df392010-11-24 20:24:31 +0000913
914class LinkTests(unittest.TestCase):
915 def setUp(self):
916 self.file1 = support.TESTFN
917 self.file2 = os.path.join(support.TESTFN + "2")
918
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000919 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000920 for file in (self.file1, self.file2):
921 if os.path.exists(file):
922 os.unlink(file)
923
Brian Curtin1b9df392010-11-24 20:24:31 +0000924 def _test_link(self, file1, file2):
925 with open(file1, "w") as f1:
926 f1.write("test")
927
928 os.link(file1, file2)
929 with open(file1, "r") as f1, open(file2, "r") as f2:
930 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
931
932 def test_link(self):
933 self._test_link(self.file1, self.file2)
934
935 def test_link_bytes(self):
936 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
937 bytes(self.file2, sys.getfilesystemencoding()))
938
Brian Curtinf498b752010-11-30 15:54:04 +0000939 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000940 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000941 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000942 except UnicodeError:
943 raise unittest.SkipTest("Unable to encode for this platform.")
944
Brian Curtinf498b752010-11-30 15:54:04 +0000945 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000946 self.file2 = self.file1 + "2"
947 self._test_link(self.file1, self.file2)
948
Thomas Wouters477c8d52006-05-27 19:21:47 +0000949if sys.platform != 'win32':
950 class Win32ErrorTests(unittest.TestCase):
951 pass
952
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000953 class PosixUidGidTests(unittest.TestCase):
954 if hasattr(os, 'setuid'):
955 def test_setuid(self):
956 if os.getuid() != 0:
957 self.assertRaises(os.error, os.setuid, 0)
958 self.assertRaises(OverflowError, os.setuid, 1<<32)
959
960 if hasattr(os, 'setgid'):
961 def test_setgid(self):
962 if os.getuid() != 0:
963 self.assertRaises(os.error, os.setgid, 0)
964 self.assertRaises(OverflowError, os.setgid, 1<<32)
965
966 if hasattr(os, 'seteuid'):
967 def test_seteuid(self):
968 if os.getuid() != 0:
969 self.assertRaises(os.error, os.seteuid, 0)
970 self.assertRaises(OverflowError, os.seteuid, 1<<32)
971
972 if hasattr(os, 'setegid'):
973 def test_setegid(self):
974 if os.getuid() != 0:
975 self.assertRaises(os.error, os.setegid, 0)
976 self.assertRaises(OverflowError, os.setegid, 1<<32)
977
978 if hasattr(os, 'setreuid'):
979 def test_setreuid(self):
980 if os.getuid() != 0:
981 self.assertRaises(os.error, os.setreuid, 0, 0)
982 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
983 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000984
985 def test_setreuid_neg1(self):
986 # Needs to accept -1. We run this in a subprocess to avoid
987 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000988 subprocess.check_call([
989 sys.executable, '-c',
990 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000991
992 if hasattr(os, 'setregid'):
993 def test_setregid(self):
994 if os.getuid() != 0:
995 self.assertRaises(os.error, os.setregid, 0, 0)
996 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
997 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000998
999 def test_setregid_neg1(self):
1000 # Needs to accept -1. We run this in a subprocess to avoid
1001 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001002 subprocess.check_call([
1003 sys.executable, '-c',
1004 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001005
1006 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001007 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001008 if support.TESTFN_UNENCODABLE:
1009 self.dir = support.TESTFN_UNENCODABLE
1010 else:
1011 self.dir = support.TESTFN
1012 self.bdir = os.fsencode(self.dir)
1013
1014 bytesfn = []
1015 def add_filename(fn):
1016 try:
1017 fn = os.fsencode(fn)
1018 except UnicodeEncodeError:
1019 return
1020 bytesfn.append(fn)
1021 add_filename(support.TESTFN_UNICODE)
1022 if support.TESTFN_UNENCODABLE:
1023 add_filename(support.TESTFN_UNENCODABLE)
1024 if not bytesfn:
1025 self.skipTest("couldn't create any non-ascii filename")
1026
1027 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001028 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001029 try:
1030 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001031 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001032 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001033 if fn in self.unicodefn:
1034 raise ValueError("duplicate filename")
1035 self.unicodefn.add(fn)
1036 except:
1037 shutil.rmtree(self.dir)
1038 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001039
1040 def tearDown(self):
1041 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001042
1043 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001044 expected = self.unicodefn
1045 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001046 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001047
1048 def test_open(self):
1049 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001050 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001051 f.close()
1052
1053 def test_stat(self):
1054 for fn in self.unicodefn:
1055 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001056else:
1057 class PosixUidGidTests(unittest.TestCase):
1058 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001059 class Pep383Tests(unittest.TestCase):
1060 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001061
Brian Curtineb24d742010-04-12 17:16:38 +00001062@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1063class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001064 def _kill(self, sig):
1065 # Start sys.executable as a subprocess and communicate from the
1066 # subprocess to the parent that the interpreter is ready. When it
1067 # becomes ready, send *sig* via os.kill to the subprocess and check
1068 # that the return code is equal to *sig*.
1069 import ctypes
1070 from ctypes import wintypes
1071 import msvcrt
1072
1073 # Since we can't access the contents of the process' stdout until the
1074 # process has exited, use PeekNamedPipe to see what's inside stdout
1075 # without waiting. This is done so we can tell that the interpreter
1076 # is started and running at a point where it could handle a signal.
1077 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1078 PeekNamedPipe.restype = wintypes.BOOL
1079 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1080 ctypes.POINTER(ctypes.c_char), # stdout buf
1081 wintypes.DWORD, # Buffer size
1082 ctypes.POINTER(wintypes.DWORD), # bytes read
1083 ctypes.POINTER(wintypes.DWORD), # bytes avail
1084 ctypes.POINTER(wintypes.DWORD)) # bytes left
1085 msg = "running"
1086 proc = subprocess.Popen([sys.executable, "-c",
1087 "import sys;"
1088 "sys.stdout.write('{}');"
1089 "sys.stdout.flush();"
1090 "input()".format(msg)],
1091 stdout=subprocess.PIPE,
1092 stderr=subprocess.PIPE,
1093 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001094 self.addCleanup(proc.stdout.close)
1095 self.addCleanup(proc.stderr.close)
1096 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001097
1098 count, max = 0, 100
1099 while count < max and proc.poll() is None:
1100 # Create a string buffer to store the result of stdout from the pipe
1101 buf = ctypes.create_string_buffer(len(msg))
1102 # Obtain the text currently in proc.stdout
1103 # Bytes read/avail/left are left as NULL and unused
1104 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1105 buf, ctypes.sizeof(buf), None, None, None)
1106 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1107 if buf.value:
1108 self.assertEqual(msg, buf.value.decode())
1109 break
1110 time.sleep(0.1)
1111 count += 1
1112 else:
1113 self.fail("Did not receive communication from the subprocess")
1114
Brian Curtineb24d742010-04-12 17:16:38 +00001115 os.kill(proc.pid, sig)
1116 self.assertEqual(proc.wait(), sig)
1117
1118 def test_kill_sigterm(self):
1119 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001120 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001121
1122 def test_kill_int(self):
1123 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001124 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001125
1126 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001127 tagname = "test_os_%s" % uuid.uuid1()
1128 m = mmap.mmap(-1, 1, tagname)
1129 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001130 # Run a script which has console control handling enabled.
1131 proc = subprocess.Popen([sys.executable,
1132 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001133 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001134 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1135 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001136 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001137 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001138 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001139 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001140 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001141 count += 1
1142 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001143 # Forcefully kill the process if we weren't able to signal it.
1144 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001145 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001146 os.kill(proc.pid, event)
1147 # proc.send_signal(event) could also be done here.
1148 # Allow time for the signal to be passed and the process to exit.
1149 time.sleep(0.5)
1150 if not proc.poll():
1151 # Forcefully kill the process if we weren't able to signal it.
1152 os.kill(proc.pid, signal.SIGINT)
1153 self.fail("subprocess did not stop on {}".format(name))
1154
1155 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1156 def test_CTRL_C_EVENT(self):
1157 from ctypes import wintypes
1158 import ctypes
1159
1160 # Make a NULL value by creating a pointer with no argument.
1161 NULL = ctypes.POINTER(ctypes.c_int)()
1162 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1163 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1164 wintypes.BOOL)
1165 SetConsoleCtrlHandler.restype = wintypes.BOOL
1166
1167 # Calling this with NULL and FALSE causes the calling process to
1168 # handle CTRL+C, rather than ignore it. This property is inherited
1169 # by subprocesses.
1170 SetConsoleCtrlHandler(NULL, 0)
1171
1172 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1173
1174 def test_CTRL_BREAK_EVENT(self):
1175 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1176
1177
Brian Curtind40e6f72010-07-08 21:39:08 +00001178@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001179@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001180class Win32SymlinkTests(unittest.TestCase):
1181 filelink = 'filelinktest'
1182 filelink_target = os.path.abspath(__file__)
1183 dirlink = 'dirlinktest'
1184 dirlink_target = os.path.dirname(filelink_target)
1185 missing_link = 'missing link'
1186
1187 def setUp(self):
1188 assert os.path.exists(self.dirlink_target)
1189 assert os.path.exists(self.filelink_target)
1190 assert not os.path.exists(self.dirlink)
1191 assert not os.path.exists(self.filelink)
1192 assert not os.path.exists(self.missing_link)
1193
1194 def tearDown(self):
1195 if os.path.exists(self.filelink):
1196 os.remove(self.filelink)
1197 if os.path.exists(self.dirlink):
1198 os.rmdir(self.dirlink)
1199 if os.path.lexists(self.missing_link):
1200 os.remove(self.missing_link)
1201
1202 def test_directory_link(self):
1203 os.symlink(self.dirlink_target, self.dirlink)
1204 self.assertTrue(os.path.exists(self.dirlink))
1205 self.assertTrue(os.path.isdir(self.dirlink))
1206 self.assertTrue(os.path.islink(self.dirlink))
1207 self.check_stat(self.dirlink, self.dirlink_target)
1208
1209 def test_file_link(self):
1210 os.symlink(self.filelink_target, self.filelink)
1211 self.assertTrue(os.path.exists(self.filelink))
1212 self.assertTrue(os.path.isfile(self.filelink))
1213 self.assertTrue(os.path.islink(self.filelink))
1214 self.check_stat(self.filelink, self.filelink_target)
1215
1216 def _create_missing_dir_link(self):
1217 'Create a "directory" link to a non-existent target'
1218 linkname = self.missing_link
1219 if os.path.lexists(linkname):
1220 os.remove(linkname)
1221 target = r'c:\\target does not exist.29r3c740'
1222 assert not os.path.exists(target)
1223 target_is_dir = True
1224 os.symlink(target, linkname, target_is_dir)
1225
1226 def test_remove_directory_link_to_missing_target(self):
1227 self._create_missing_dir_link()
1228 # For compatibility with Unix, os.remove will check the
1229 # directory status and call RemoveDirectory if the symlink
1230 # was created with target_is_dir==True.
1231 os.remove(self.missing_link)
1232
1233 @unittest.skip("currently fails; consider for improvement")
1234 def test_isdir_on_directory_link_to_missing_target(self):
1235 self._create_missing_dir_link()
1236 # consider having isdir return true for directory links
1237 self.assertTrue(os.path.isdir(self.missing_link))
1238
1239 @unittest.skip("currently fails; consider for improvement")
1240 def test_rmdir_on_directory_link_to_missing_target(self):
1241 self._create_missing_dir_link()
1242 # consider allowing rmdir to remove directory links
1243 os.rmdir(self.missing_link)
1244
1245 def check_stat(self, link, target):
1246 self.assertEqual(os.stat(link), os.stat(target))
1247 self.assertNotEqual(os.lstat(link), os.stat(link))
1248
Brian Curtind25aef52011-06-13 15:16:04 -05001249 bytes_link = os.fsencode(link)
1250 self.assertEqual(os.stat(bytes_link), os.stat(target))
1251 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
1252
1253 def test_12084(self):
1254 level1 = os.path.abspath(support.TESTFN)
1255 level2 = os.path.join(level1, "level2")
1256 level3 = os.path.join(level2, "level3")
1257 try:
1258 os.mkdir(level1)
1259 os.mkdir(level2)
1260 os.mkdir(level3)
1261
1262 file1 = os.path.abspath(os.path.join(level1, "file1"))
1263
1264 with open(file1, "w") as f:
1265 f.write("file1")
1266
1267 orig_dir = os.getcwd()
1268 try:
1269 os.chdir(level2)
1270 link = os.path.join(level2, "link")
1271 os.symlink(os.path.relpath(file1), "link")
1272 self.assertIn("link", os.listdir(os.getcwd()))
1273
1274 # Check os.stat calls from the same dir as the link
1275 self.assertEqual(os.stat(file1), os.stat("link"))
1276
1277 # Check os.stat calls from a dir below the link
1278 os.chdir(level1)
1279 self.assertEqual(os.stat(file1),
1280 os.stat(os.path.relpath(link)))
1281
1282 # Check os.stat calls from a dir above the link
1283 os.chdir(level3)
1284 self.assertEqual(os.stat(file1),
1285 os.stat(os.path.relpath(link)))
1286 finally:
1287 os.chdir(orig_dir)
1288 except OSError as err:
1289 self.fail(err)
1290 finally:
1291 os.remove(file1)
1292 shutil.rmtree(level1)
1293
Brian Curtind40e6f72010-07-08 21:39:08 +00001294
Victor Stinnere8d51452010-08-19 01:05:19 +00001295class FSEncodingTests(unittest.TestCase):
1296 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001297 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1298 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001299
Victor Stinnere8d51452010-08-19 01:05:19 +00001300 def test_identity(self):
1301 # assert fsdecode(fsencode(x)) == x
1302 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1303 try:
1304 bytesfn = os.fsencode(fn)
1305 except UnicodeEncodeError:
1306 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001307 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001308
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001309
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001310class PidTests(unittest.TestCase):
1311 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1312 def test_getppid(self):
1313 p = subprocess.Popen([sys.executable, '-c',
1314 'import os; print(os.getppid())'],
1315 stdout=subprocess.PIPE)
1316 stdout, _ = p.communicate()
1317 # We are the parent of our subprocess
1318 self.assertEqual(int(stdout), os.getpid())
1319
1320
Brian Curtin0151b8e2010-09-24 13:43:43 +00001321# The introduction of this TestCase caused at least two different errors on
1322# *nix buildbots. Temporarily skip this to let the buildbots move along.
1323@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001324@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1325class LoginTests(unittest.TestCase):
1326 def test_getlogin(self):
1327 user_name = os.getlogin()
1328 self.assertNotEqual(len(user_name), 0)
1329
1330
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001331@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1332 "needs os.getpriority and os.setpriority")
1333class ProgramPriorityTests(unittest.TestCase):
1334 """Tests for os.getpriority() and os.setpriority()."""
1335
1336 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001337
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001338 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1339 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1340 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001341 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1342 if base >= 19 and new_prio <= 19:
1343 raise unittest.SkipTest(
1344 "unable to reliably test setpriority at current nice level of %s" % base)
1345 else:
1346 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001347 finally:
1348 try:
1349 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1350 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001351 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001352 raise
1353
1354
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001355if threading is not None:
1356 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001357
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001358 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001359
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001360 def __init__(self, conn):
1361 asynchat.async_chat.__init__(self, conn)
1362 self.in_buffer = []
1363 self.closed = False
1364 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001365
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001366 def handle_read(self):
1367 data = self.recv(4096)
1368 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001369
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001370 def get_data(self):
1371 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001372
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001373 def handle_close(self):
1374 self.close()
1375 self.closed = True
1376
1377 def handle_error(self):
1378 raise
1379
1380 def __init__(self, address):
1381 threading.Thread.__init__(self)
1382 asyncore.dispatcher.__init__(self)
1383 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1384 self.bind(address)
1385 self.listen(5)
1386 self.host, self.port = self.socket.getsockname()[:2]
1387 self.handler_instance = None
1388 self._active = False
1389 self._active_lock = threading.Lock()
1390
1391 # --- public API
1392
1393 @property
1394 def running(self):
1395 return self._active
1396
1397 def start(self):
1398 assert not self.running
1399 self.__flag = threading.Event()
1400 threading.Thread.start(self)
1401 self.__flag.wait()
1402
1403 def stop(self):
1404 assert self.running
1405 self._active = False
1406 self.join()
1407
1408 def wait(self):
1409 # wait for handler connection to be closed, then stop the server
1410 while not getattr(self.handler_instance, "closed", False):
1411 time.sleep(0.001)
1412 self.stop()
1413
1414 # --- internals
1415
1416 def run(self):
1417 self._active = True
1418 self.__flag.set()
1419 while self._active and asyncore.socket_map:
1420 self._active_lock.acquire()
1421 asyncore.loop(timeout=0.001, count=1)
1422 self._active_lock.release()
1423 asyncore.close_all()
1424
1425 def handle_accept(self):
1426 conn, addr = self.accept()
1427 self.handler_instance = self.Handler(conn)
1428
1429 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001430 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001431 handle_read = handle_connect
1432
1433 def writable(self):
1434 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001435
1436 def handle_error(self):
1437 raise
1438
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001439
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001440@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001441@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1442class TestSendfile(unittest.TestCase):
1443
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001444 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001445 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001446 not sys.platform.startswith("solaris") and \
1447 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001448
1449 @classmethod
1450 def setUpClass(cls):
1451 with open(support.TESTFN, "wb") as f:
1452 f.write(cls.DATA)
1453
1454 @classmethod
1455 def tearDownClass(cls):
1456 support.unlink(support.TESTFN)
1457
1458 def setUp(self):
1459 self.server = SendfileTestServer((support.HOST, 0))
1460 self.server.start()
1461 self.client = socket.socket()
1462 self.client.connect((self.server.host, self.server.port))
1463 self.client.settimeout(1)
1464 # synchronize by waiting for "220 ready" response
1465 self.client.recv(1024)
1466 self.sockno = self.client.fileno()
1467 self.file = open(support.TESTFN, 'rb')
1468 self.fileno = self.file.fileno()
1469
1470 def tearDown(self):
1471 self.file.close()
1472 self.client.close()
1473 if self.server.running:
1474 self.server.stop()
1475
1476 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1477 """A higher level wrapper representing how an application is
1478 supposed to use sendfile().
1479 """
1480 while 1:
1481 try:
1482 if self.SUPPORT_HEADERS_TRAILERS:
1483 return os.sendfile(sock, file, offset, nbytes, headers,
1484 trailers)
1485 else:
1486 return os.sendfile(sock, file, offset, nbytes)
1487 except OSError as err:
1488 if err.errno == errno.ECONNRESET:
1489 # disconnected
1490 raise
1491 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1492 # we have to retry send data
1493 continue
1494 else:
1495 raise
1496
1497 def test_send_whole_file(self):
1498 # normal send
1499 total_sent = 0
1500 offset = 0
1501 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001502 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001503 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1504 if sent == 0:
1505 break
1506 offset += sent
1507 total_sent += sent
1508 self.assertTrue(sent <= nbytes)
1509 self.assertEqual(offset, total_sent)
1510
1511 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001512 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001513 self.client.close()
1514 self.server.wait()
1515 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001516 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001517 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001518
1519 def test_send_at_certain_offset(self):
1520 # start sending a file at a certain offset
1521 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001522 offset = len(self.DATA) // 2
1523 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001524 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001525 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001526 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1527 if sent == 0:
1528 break
1529 offset += sent
1530 total_sent += sent
1531 self.assertTrue(sent <= nbytes)
1532
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001533 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001534 self.client.close()
1535 self.server.wait()
1536 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001537 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001538 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001539 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001540 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001541
1542 def test_offset_overflow(self):
1543 # specify an offset > file size
1544 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001545 try:
1546 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1547 except OSError as e:
1548 # Solaris can raise EINVAL if offset >= file length, ignore.
1549 if e.errno != errno.EINVAL:
1550 raise
1551 else:
1552 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001553 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001554 self.client.close()
1555 self.server.wait()
1556 data = self.server.handler_instance.get_data()
1557 self.assertEqual(data, b'')
1558
1559 def test_invalid_offset(self):
1560 with self.assertRaises(OSError) as cm:
1561 os.sendfile(self.sockno, self.fileno, -1, 4096)
1562 self.assertEqual(cm.exception.errno, errno.EINVAL)
1563
1564 # --- headers / trailers tests
1565
1566 if SUPPORT_HEADERS_TRAILERS:
1567
1568 def test_headers(self):
1569 total_sent = 0
1570 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1571 headers=[b"x" * 512])
1572 total_sent += sent
1573 offset = 4096
1574 nbytes = 4096
1575 while 1:
1576 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1577 offset, nbytes)
1578 if sent == 0:
1579 break
1580 total_sent += sent
1581 offset += sent
1582
1583 expected_data = b"x" * 512 + self.DATA
1584 self.assertEqual(total_sent, len(expected_data))
1585 self.client.close()
1586 self.server.wait()
1587 data = self.server.handler_instance.get_data()
1588 self.assertEqual(hash(data), hash(expected_data))
1589
1590 def test_trailers(self):
1591 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001592 with open(TESTFN2, 'wb') as f:
1593 f.write(b"abcde")
1594 with open(TESTFN2, 'rb')as f:
1595 self.addCleanup(os.remove, TESTFN2)
1596 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1597 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001598 self.client.close()
1599 self.server.wait()
1600 data = self.server.handler_instance.get_data()
1601 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001602
1603 if hasattr(os, "SF_NODISKIO"):
1604 def test_flags(self):
1605 try:
1606 os.sendfile(self.sockno, self.fileno, 0, 4096,
1607 flags=os.SF_NODISKIO)
1608 except OSError as err:
1609 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1610 raise
1611
1612
Fred Drake2e2be372001-09-20 21:33:42 +00001613def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001614 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001615 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001616 StatAttributeTests,
1617 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001618 WalkTests,
1619 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001620 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001621 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001622 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001623 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001624 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001625 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001626 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001627 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001628 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001629 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001630 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001631 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001632 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001633 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001634 ProgramPriorityTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001635 )
Fred Drake2e2be372001-09-20 21:33:42 +00001636
1637if __name__ == "__main__":
1638 test_main()