blob: aa619a8dd7db5af42080df052af5099b0ad0ccc1 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000027try:
28 import threading
29except ImportError:
30 threading = None
Georg Brandl2daf6ae2012-02-20 19:54:16 +010031from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000032
Victor Stinner1aa54a42012-02-08 04:09:37 +010033os.stat_float_times(True)
34st = os.stat(__file__)
35stat_supports_subsecond = (
36 # check if float and int timestamps are different
37 (st.st_atime != st[7])
38 or (st.st_mtime != st[8])
39 or (st.st_ctime != st[9]))
40
Mark Dickinson7cf03892010-04-16 13:45:35 +000041# Detect whether we're on a Linux system that uses the (now outdated
42# and unmaintained) linuxthreads threading library. There's an issue
43# when combining linuxthreads with a failed execv call: see
44# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020045if hasattr(sys, 'thread_info') and sys.thread_info.version:
46 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
47else:
48 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000049
Thomas Wouters0e3f5912006-08-11 14:57:12 +000050# Tests creating TESTFN
51class FileTests(unittest.TestCase):
52 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000053 if os.path.exists(support.TESTFN):
54 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000055 tearDown = setUp
56
57 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000058 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000059 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000060 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000061
Christian Heimesfdab48e2008-01-20 09:06:41 +000062 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
64 # We must allocate two consecutive file descriptors, otherwise
65 # it will mess up other file descriptors (perhaps even the three
66 # standard ones).
67 second = os.dup(first)
68 try:
69 retries = 0
70 while second != first + 1:
71 os.close(first)
72 retries += 1
73 if retries > 10:
74 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000075 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000076 first, second = second, os.dup(second)
77 finally:
78 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000079 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000080 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000081 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000082
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000083 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000084 def test_rename(self):
85 path = support.TESTFN
86 old = sys.getrefcount(path)
87 self.assertRaises(TypeError, os.rename, path, 0)
88 new = sys.getrefcount(path)
89 self.assertEqual(old, new)
90
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000091 def test_read(self):
92 with open(support.TESTFN, "w+b") as fobj:
93 fobj.write(b"spam")
94 fobj.flush()
95 fd = fobj.fileno()
96 os.lseek(fd, 0, 0)
97 s = os.read(fd, 4)
98 self.assertEqual(type(s), bytes)
99 self.assertEqual(s, b"spam")
100
101 def test_write(self):
102 # os.write() accepts bytes- and buffer-like objects but not strings
103 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
104 self.assertRaises(TypeError, os.write, fd, "beans")
105 os.write(fd, b"bacon\n")
106 os.write(fd, bytearray(b"eggs\n"))
107 os.write(fd, memoryview(b"spam\n"))
108 os.close(fd)
109 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000110 self.assertEqual(fobj.read().splitlines(),
111 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000112
Victor Stinnere0daff12011-03-20 23:36:35 +0100113 def write_windows_console(self, *args):
114 retcode = subprocess.call(args,
115 # use a new console to not flood the test output
116 creationflags=subprocess.CREATE_NEW_CONSOLE,
117 # use a shell to hide the console window (SW_HIDE)
118 shell=True)
119 self.assertEqual(retcode, 0)
120
121 @unittest.skipUnless(sys.platform == 'win32',
122 'test specific to the Windows console')
123 def test_write_windows_console(self):
124 # Issue #11395: the Windows console returns an error (12: not enough
125 # space error) on writing into stdout if stdout mode is binary and the
126 # length is greater than 66,000 bytes (or less, depending on heap
127 # usage).
128 code = "print('x' * 100000)"
129 self.write_windows_console(sys.executable, "-c", code)
130 self.write_windows_console(sys.executable, "-u", "-c", code)
131
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000132 def fdopen_helper(self, *args):
133 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200134 f = os.fdopen(fd, *args)
135 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000136
137 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200138 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
139 os.close(fd)
140
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000141 self.fdopen_helper()
142 self.fdopen_helper('r')
143 self.fdopen_helper('r', 100)
144
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100145 def test_replace(self):
146 TESTFN2 = support.TESTFN + ".2"
147 with open(support.TESTFN, 'w') as f:
148 f.write("1")
149 with open(TESTFN2, 'w') as f:
150 f.write("2")
151 self.addCleanup(os.unlink, TESTFN2)
152 os.replace(support.TESTFN, TESTFN2)
153 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
154 with open(TESTFN2, 'r') as f:
155 self.assertEqual(f.read(), "1")
156
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200157
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000158# Test attributes on return values from os.*stat* family.
159class StatAttributeTests(unittest.TestCase):
160 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000161 os.mkdir(support.TESTFN)
162 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000163 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000164 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000166
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 def tearDown(self):
168 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000169 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000170
Antoine Pitrou38425292010-09-21 18:19:07 +0000171 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000172 if not hasattr(os, "stat"):
173 return
174
Antoine Pitrou38425292010-09-21 18:19:07 +0000175 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000176
177 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000178 self.assertEqual(result[stat.ST_SIZE], 3)
179 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 # Make sure all the attributes are there
182 members = dir(result)
183 for name in dir(stat):
184 if name[:3] == 'ST_':
185 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000186 if name.endswith("TIME"):
187 def trunc(x): return int(x)
188 else:
189 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000190 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000191 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000192 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000193
Larry Hastings6fe20b32012-04-19 15:07:49 -0700194 # Make sure that the st_?time and st_?time_ns fields roughly agree
195 # (they should always agree up to the tens-of-microseconds magnitude)
196 for name in 'st_atime st_mtime st_ctime'.split():
197 floaty = int(getattr(result, name) * 100000)
198 nanosecondy = getattr(result, name + "_ns") // 10000
199 self.assertEqual(floaty, nanosecondy)
200
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000201 try:
202 result[200]
203 self.fail("No exception thrown")
204 except IndexError:
205 pass
206
207 # Make sure that assignment fails
208 try:
209 result.st_mode = 1
210 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000211 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000212 pass
213
214 try:
215 result.st_rdev = 1
216 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000217 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000218 pass
219
220 try:
221 result.parrot = 1
222 self.fail("No exception thrown")
223 except AttributeError:
224 pass
225
226 # Use the stat_result constructor with a too-short tuple.
227 try:
228 result2 = os.stat_result((10,))
229 self.fail("No exception thrown")
230 except TypeError:
231 pass
232
Ezio Melotti42da6632011-03-15 05:18:48 +0200233 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234 try:
235 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
236 except TypeError:
237 pass
238
Antoine Pitrou38425292010-09-21 18:19:07 +0000239 def test_stat_attributes(self):
240 self.check_stat_attributes(self.fname)
241
242 def test_stat_attributes_bytes(self):
243 try:
244 fname = self.fname.encode(sys.getfilesystemencoding())
245 except UnicodeEncodeError:
246 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100247 with warnings.catch_warnings():
248 warnings.simplefilter("ignore", DeprecationWarning)
249 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000250
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000251 def test_statvfs_attributes(self):
252 if not hasattr(os, "statvfs"):
253 return
254
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000255 try:
256 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000257 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000258 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000259 if e.errno == errno.ENOSYS:
260 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261
262 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000263 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000265 # Make sure all the attributes are there.
266 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
267 'ffree', 'favail', 'flag', 'namemax')
268 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270
271 # Make sure that assignment really fails
272 try:
273 result.f_bfree = 1
274 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000275 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276 pass
277
278 try:
279 result.parrot = 1
280 self.fail("No exception thrown")
281 except AttributeError:
282 pass
283
284 # Use the constructor with a too-short tuple.
285 try:
286 result2 = os.statvfs_result((10,))
287 self.fail("No exception thrown")
288 except TypeError:
289 pass
290
Ezio Melotti42da6632011-03-15 05:18:48 +0200291 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 try:
293 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
294 except TypeError:
295 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000296
Thomas Wouters89f507f2006-12-13 04:49:30 +0000297 def test_utime_dir(self):
298 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000299 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000300 # round to int, because some systems may support sub-second
301 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000302 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
303 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000304 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000305
Brian Curtin52fbea12011-11-06 13:41:17 -0600306 def test_utime_noargs(self):
Brian Curtin0277aa32011-11-06 13:50:15 -0600307 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600308 # second argument. Check that the previous methods of passing
309 # a time tuple or None work in addition to no argument.
310 st = os.stat(support.TESTFN)
311 # Doesn't set anything new, but sets the time tuple way
312 os.utime(support.TESTFN, (st.st_atime, st.st_mtime))
313 # Set to the current time in the old explicit way.
314 os.utime(support.TESTFN, None)
315 st1 = os.stat(support.TESTFN)
316 # Set to the current time in the new way
317 os.utime(support.TESTFN)
318 st2 = os.stat(support.TESTFN)
319 self.assertAlmostEqual(st1.st_mtime, st2.st_mtime, delta=10)
320
Victor Stinner1aa54a42012-02-08 04:09:37 +0100321 @unittest.skipUnless(stat_supports_subsecond,
322 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100323 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100324 asec, amsec = 1, 901
325 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100326 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100327 mtime = msec + mmsec * 1e-3
328 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100329 os.utime(filename, (0, 0))
330 set_time_func(filename, atime, mtime)
Victor Stinner1aa54a42012-02-08 04:09:37 +0100331 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100332 st = os.stat(filename)
333 self.assertAlmostEqual(st.st_atime, atime, places=3)
334 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100335
Victor Stinnera2f7c002012-02-08 03:36:25 +0100336 def test_utime_subsecond(self):
337 def set_time(filename, atime, mtime):
338 os.utime(filename, (atime, mtime))
339 self._test_utime_subsecond(set_time)
340
341 @unittest.skipUnless(hasattr(os, 'futimes'),
342 "os.futimes required for this test.")
343 def test_futimes_subsecond(self):
344 def set_time(filename, atime, mtime):
345 with open(filename, "wb") as f:
346 os.futimes(f.fileno(), (atime, mtime))
347 self._test_utime_subsecond(set_time)
348
349 @unittest.skipUnless(hasattr(os, 'futimens'),
350 "os.futimens required for this test.")
351 def test_futimens_subsecond(self):
352 def set_time(filename, atime, mtime):
353 with open(filename, "wb") as f:
354 asec, ansec = divmod(atime, 1.0)
355 asec = int(asec)
356 ansec = int(ansec * 1e9)
357 msec, mnsec = divmod(mtime, 1.0)
358 msec = int(msec)
359 mnsec = int(mnsec * 1e9)
360 os.futimens(f.fileno(),
361 (asec, ansec),
362 (msec, mnsec))
363 self._test_utime_subsecond(set_time)
364
365 @unittest.skipUnless(hasattr(os, 'futimesat'),
366 "os.futimesat required for this test.")
367 def test_futimesat_subsecond(self):
368 def set_time(filename, atime, mtime):
369 dirname = os.path.dirname(filename)
370 dirfd = os.open(dirname, os.O_RDONLY)
371 try:
372 os.futimesat(dirfd, os.path.basename(filename),
373 (atime, mtime))
374 finally:
375 os.close(dirfd)
376 self._test_utime_subsecond(set_time)
377
378 @unittest.skipUnless(hasattr(os, 'lutimes'),
379 "os.lutimes required for this test.")
380 def test_lutimes_subsecond(self):
381 def set_time(filename, atime, mtime):
382 os.lutimes(filename, (atime, mtime))
383 self._test_utime_subsecond(set_time)
384
385 @unittest.skipUnless(hasattr(os, 'utimensat'),
386 "os.utimensat required for this test.")
387 def test_utimensat_subsecond(self):
388 def set_time(filename, atime, mtime):
389 dirname = os.path.dirname(filename)
390 dirfd = os.open(dirname, os.O_RDONLY)
391 try:
392 asec, ansec = divmod(atime, 1.0)
393 asec = int(asec)
394 ansec = int(ansec * 1e9)
395 msec, mnsec = divmod(mtime, 1.0)
396 msec = int(msec)
397 mnsec = int(mnsec * 1e9)
398 os.utimensat(dirfd, os.path.basename(filename),
399 (asec, ansec),
400 (msec, mnsec))
401 finally:
402 os.close(dirfd)
403 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100404
Thomas Wouters89f507f2006-12-13 04:49:30 +0000405 # Restrict test to Win32, since there is no guarantee other
406 # systems support centiseconds
407 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000408 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000409 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000410 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000411 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000412 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000413 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000414 return buf.value
415
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000416 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000417 def test_1565150(self):
418 t1 = 1159195039.25
419 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000420 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000421
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000422 def test_large_time(self):
423 t1 = 5000000000 # some day in 2128
424 os.utime(self.fname, (t1, t1))
425 self.assertEqual(os.stat(self.fname).st_mtime, t1)
426
Guido van Rossumd8faa362007-04-27 19:54:29 +0000427 def test_1686475(self):
428 # Verify that an open file can be stat'ed
429 try:
430 os.stat(r"c:\pagefile.sys")
431 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000432 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000433 return
434 self.fail("Could not stat pagefile.sys")
435
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000436from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000437
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000438class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000439 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000440 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000441
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000442 def setUp(self):
443 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000444 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000445 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000446 for key, value in self._reference().items():
447 os.environ[key] = value
448
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000449 def tearDown(self):
450 os.environ.clear()
451 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000452 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000453 os.environb.clear()
454 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000455
Christian Heimes90333392007-11-01 19:08:42 +0000456 def _reference(self):
457 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
458
459 def _empty_mapping(self):
460 os.environ.clear()
461 return os.environ
462
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000463 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000464 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000465 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000466 if os.path.exists("/bin/sh"):
467 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000468 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
469 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000470 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000471
Christian Heimes1a13d592007-11-08 14:16:55 +0000472 def test_os_popen_iter(self):
473 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000474 with os.popen(
475 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
476 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000477 self.assertEqual(next(it), "line1\n")
478 self.assertEqual(next(it), "line2\n")
479 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000480 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000481
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000482 # Verify environ keys and values from the OS are of the
483 # correct str type.
484 def test_keyvalue_types(self):
485 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000486 self.assertEqual(type(key), str)
487 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000488
Christian Heimes90333392007-11-01 19:08:42 +0000489 def test_items(self):
490 for key, value in self._reference().items():
491 self.assertEqual(os.environ.get(key), value)
492
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000493 # Issue 7310
494 def test___repr__(self):
495 """Check that the repr() of os.environ looks like environ({...})."""
496 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000497 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
498 '{!r}: {!r}'.format(key, value)
499 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000500
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000501 def test_get_exec_path(self):
502 defpath_list = os.defpath.split(os.pathsep)
503 test_path = ['/monty', '/python', '', '/flying/circus']
504 test_env = {'PATH': os.pathsep.join(test_path)}
505
506 saved_environ = os.environ
507 try:
508 os.environ = dict(test_env)
509 # Test that defaulting to os.environ works.
510 self.assertSequenceEqual(test_path, os.get_exec_path())
511 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
512 finally:
513 os.environ = saved_environ
514
515 # No PATH environment variable
516 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
517 # Empty PATH environment variable
518 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
519 # Supplied PATH environment variable
520 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
521
Victor Stinnerb745a742010-05-18 17:17:23 +0000522 if os.supports_bytes_environ:
523 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000524 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000525 # ignore BytesWarning warning
526 with warnings.catch_warnings(record=True):
527 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000528 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000529 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000530 pass
531 else:
532 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000533
534 # bytes key and/or value
535 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
536 ['abc'])
537 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
538 ['abc'])
539 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
540 ['abc'])
541
542 @unittest.skipUnless(os.supports_bytes_environ,
543 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000544 def test_environb(self):
545 # os.environ -> os.environb
546 value = 'euro\u20ac'
547 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000548 value_bytes = value.encode(sys.getfilesystemencoding(),
549 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000550 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000551 msg = "U+20AC character is not encodable to %s" % (
552 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000553 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000554 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000555 self.assertEqual(os.environ['unicode'], value)
556 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000557
558 # os.environb -> os.environ
559 value = b'\xff'
560 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000561 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000562 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000563 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000564
Charles-François Natali2966f102011-11-26 11:32:46 +0100565 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
566 # #13415).
567 @support.requires_freebsd_version(7)
568 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100569 def test_unset_error(self):
570 if sys.platform == "win32":
571 # an environment variable is limited to 32,767 characters
572 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100573 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100574 else:
575 # "=" is not allowed in a variable name
576 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100577 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100578
Tim Petersc4e09402003-04-25 07:11:48 +0000579class WalkTests(unittest.TestCase):
580 """Tests for os.walk()."""
581
Charles-François Natali7372b062012-02-05 15:15:38 +0100582 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000583 import os
584 from os.path import join
585
586 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000587 # TESTFN/
588 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000589 # tmp1
590 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000591 # tmp2
592 # SUB11/ no kids
593 # SUB2/ a file kid and a dirsymlink kid
594 # tmp3
595 # link/ a symlink to TESTFN.2
596 # TEST2/
597 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000598 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000599 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000600 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000601 sub2_path = join(walk_path, "SUB2")
602 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000603 tmp2_path = join(sub1_path, "tmp2")
604 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000605 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000606 t2_path = join(support.TESTFN, "TEST2")
607 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000608
609 # Create stuff.
610 os.makedirs(sub11_path)
611 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000612 os.makedirs(t2_path)
613 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000614 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000615 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
616 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000617 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100618 if os.name == 'nt':
619 def symlink_to_dir(src, dest):
620 os.symlink(src, dest, True)
621 else:
622 symlink_to_dir = os.symlink
623 symlink_to_dir(os.path.abspath(t2_path), link_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000624 sub2_tree = (sub2_path, ["link"], ["tmp3"])
625 else:
626 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000627
628 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000629 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000630 self.assertEqual(len(all), 4)
631 # We can't know which order SUB1 and SUB2 will appear in.
632 # Not flipped: TESTFN, SUB1, SUB11, SUB2
633 # flipped: TESTFN, SUB2, SUB1, SUB11
634 flipped = all[0][1][0] != "SUB1"
635 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000636 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000637 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
638 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000639 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000640
641 # Prune the search.
642 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000643 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000644 all.append((root, dirs, files))
645 # Don't descend into SUB1.
646 if 'SUB1' in dirs:
647 # Note that this also mutates the dirs we appended to all!
648 dirs.remove('SUB1')
649 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000650 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
651 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000652
653 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000654 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000655 self.assertEqual(len(all), 4)
656 # We can't know which order SUB1 and SUB2 will appear in.
657 # Not flipped: SUB11, SUB1, SUB2, TESTFN
658 # flipped: SUB2, SUB11, SUB1, TESTFN
659 flipped = all[3][1][0] != "SUB1"
660 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000661 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000662 self.assertEqual(all[flipped], (sub11_path, [], []))
663 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000664 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000665
Brian Curtin3b4499c2010-12-28 14:31:47 +0000666 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000667 # Walk, following symlinks.
668 for root, dirs, files in os.walk(walk_path, followlinks=True):
669 if root == link_path:
670 self.assertEqual(dirs, [])
671 self.assertEqual(files, ["tmp4"])
672 break
673 else:
674 self.fail("Didn't follow symlink with followlinks=True")
675
676 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000677 # Tear everything down. This is a decent use for bottom-up on
678 # Windows, which doesn't have a recursive delete command. The
679 # (not so) subtlety is that rmdir will fail unless the dir's
680 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000681 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000682 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000683 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000684 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000685 dirname = os.path.join(root, name)
686 if not os.path.islink(dirname):
687 os.rmdir(dirname)
688 else:
689 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000690 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000691
Charles-François Natali7372b062012-02-05 15:15:38 +0100692
693@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
694class FwalkTests(WalkTests):
695 """Tests for os.fwalk()."""
696
697 def test_compare_to_walk(self):
698 # compare with walk() results
699 for topdown, followlinks in itertools.product((True, False), repeat=2):
700 args = support.TESTFN, topdown, None, followlinks
701 expected = {}
702 for root, dirs, files in os.walk(*args):
703 expected[root] = (set(dirs), set(files))
704
705 for root, dirs, files, rootfd in os.fwalk(*args):
706 self.assertIn(root, expected)
707 self.assertEqual(expected[root], (set(dirs), set(files)))
708
709 def test_dir_fd(self):
710 # check returned file descriptors
711 for topdown, followlinks in itertools.product((True, False), repeat=2):
712 args = support.TESTFN, topdown, None, followlinks
713 for root, dirs, files, rootfd in os.fwalk(*args):
714 # check that the FD is valid
715 os.fstat(rootfd)
Charles-François Natali77940902012-02-06 19:54:48 +0100716 # check that flistdir() returns consistent information
717 self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100718
719 def test_fd_leak(self):
720 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
721 # we both check that calling fwalk() a large number of times doesn't
722 # yield EMFILE, and that the minimum allocated FD hasn't changed.
723 minfd = os.dup(1)
724 os.close(minfd)
725 for i in range(256):
726 for x in os.fwalk(support.TESTFN):
727 pass
728 newfd = os.dup(1)
729 self.addCleanup(os.close, newfd)
730 self.assertEqual(newfd, minfd)
731
732 def tearDown(self):
733 # cleanup
734 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
735 for name in files:
736 os.unlinkat(rootfd, name)
737 for name in dirs:
738 st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW)
739 if stat.S_ISDIR(st.st_mode):
740 os.unlinkat(rootfd, name, os.AT_REMOVEDIR)
741 else:
742 os.unlinkat(rootfd, name)
743 os.rmdir(support.TESTFN)
744
745
Guido van Rossume7ba4952007-06-06 23:52:48 +0000746class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000747 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000748 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000749
750 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000751 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000752 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
753 os.makedirs(path) # Should work
754 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
755 os.makedirs(path)
756
757 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000758 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000759 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
760 os.makedirs(path)
761 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
762 'dir5', 'dir6')
763 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000764
Terry Reedy5a22b652010-12-02 07:05:56 +0000765 def test_exist_ok_existing_directory(self):
766 path = os.path.join(support.TESTFN, 'dir1')
767 mode = 0o777
768 old_mask = os.umask(0o022)
769 os.makedirs(path, mode)
770 self.assertRaises(OSError, os.makedirs, path, mode)
771 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
772 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
773 os.makedirs(path, mode=mode, exist_ok=True)
774 os.umask(old_mask)
775
776 def test_exist_ok_existing_regular_file(self):
777 base = support.TESTFN
778 path = os.path.join(support.TESTFN, 'dir1')
779 f = open(path, 'w')
780 f.write('abc')
781 f.close()
782 self.assertRaises(OSError, os.makedirs, path)
783 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
784 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
785 os.remove(path)
786
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000787 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000788 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000789 'dir4', 'dir5', 'dir6')
790 # If the tests failed, the bottom-most directory ('../dir6')
791 # may not have been created, so we look for the outermost directory
792 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000793 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000794 path = os.path.dirname(path)
795
796 os.removedirs(path)
797
Guido van Rossume7ba4952007-06-06 23:52:48 +0000798class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000799 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200800 with open(os.devnull, 'wb') as f:
801 f.write(b'hello')
802 f.close()
803 with open(os.devnull, 'rb') as f:
804 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000805
Guido van Rossume7ba4952007-06-06 23:52:48 +0000806class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100807 def test_urandom_length(self):
808 self.assertEqual(len(os.urandom(0)), 0)
809 self.assertEqual(len(os.urandom(1)), 1)
810 self.assertEqual(len(os.urandom(10)), 10)
811 self.assertEqual(len(os.urandom(100)), 100)
812 self.assertEqual(len(os.urandom(1000)), 1000)
813
814 def test_urandom_value(self):
815 data1 = os.urandom(16)
816 data2 = os.urandom(16)
817 self.assertNotEqual(data1, data2)
818
819 def get_urandom_subprocess(self, count):
820 code = '\n'.join((
821 'import os, sys',
822 'data = os.urandom(%s)' % count,
823 'sys.stdout.buffer.write(data)',
824 'sys.stdout.buffer.flush()'))
825 out = assert_python_ok('-c', code)
826 stdout = out[1]
827 self.assertEqual(len(stdout), 16)
828 return stdout
829
830 def test_urandom_subprocess(self):
831 data1 = self.get_urandom_subprocess(16)
832 data2 = self.get_urandom_subprocess(16)
833 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000834
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000835@contextlib.contextmanager
836def _execvpe_mockup(defpath=None):
837 """
838 Stubs out execv and execve functions when used as context manager.
839 Records exec calls. The mock execv and execve functions always raise an
840 exception as they would normally never return.
841 """
842 # A list of tuples containing (function name, first arg, args)
843 # of calls to execv or execve that have been made.
844 calls = []
845
846 def mock_execv(name, *args):
847 calls.append(('execv', name, args))
848 raise RuntimeError("execv called")
849
850 def mock_execve(name, *args):
851 calls.append(('execve', name, args))
852 raise OSError(errno.ENOTDIR, "execve called")
853
854 try:
855 orig_execv = os.execv
856 orig_execve = os.execve
857 orig_defpath = os.defpath
858 os.execv = mock_execv
859 os.execve = mock_execve
860 if defpath is not None:
861 os.defpath = defpath
862 yield calls
863 finally:
864 os.execv = orig_execv
865 os.execve = orig_execve
866 os.defpath = orig_defpath
867
Guido van Rossume7ba4952007-06-06 23:52:48 +0000868class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000869 @unittest.skipIf(USING_LINUXTHREADS,
870 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000871 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000872 self.assertRaises(OSError, os.execvpe, 'no such app-',
873 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000874
Thomas Heller6790d602007-08-30 17:15:14 +0000875 def test_execvpe_with_bad_arglist(self):
876 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
877
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000878 @unittest.skipUnless(hasattr(os, '_execvpe'),
879 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000880 def _test_internal_execvpe(self, test_type):
881 program_path = os.sep + 'absolutepath'
882 if test_type is bytes:
883 program = b'executable'
884 fullpath = os.path.join(os.fsencode(program_path), program)
885 native_fullpath = fullpath
886 arguments = [b'progname', 'arg1', 'arg2']
887 else:
888 program = 'executable'
889 arguments = ['progname', 'arg1', 'arg2']
890 fullpath = os.path.join(program_path, program)
891 if os.name != "nt":
892 native_fullpath = os.fsencode(fullpath)
893 else:
894 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000895 env = {'spam': 'beans'}
896
Victor Stinnerb745a742010-05-18 17:17:23 +0000897 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000898 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000899 self.assertRaises(RuntimeError,
900 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000901 self.assertEqual(len(calls), 1)
902 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
903
Victor Stinnerb745a742010-05-18 17:17:23 +0000904 # test os._execvpe() with a relative path:
905 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000906 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000907 self.assertRaises(OSError,
908 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000909 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000910 self.assertSequenceEqual(calls[0],
911 ('execve', native_fullpath, (arguments, env)))
912
913 # test os._execvpe() with a relative path:
914 # os.get_exec_path() reads the 'PATH' variable
915 with _execvpe_mockup() as calls:
916 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000917 if test_type is bytes:
918 env_path[b'PATH'] = program_path
919 else:
920 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000921 self.assertRaises(OSError,
922 os._execvpe, program, arguments, env=env_path)
923 self.assertEqual(len(calls), 1)
924 self.assertSequenceEqual(calls[0],
925 ('execve', native_fullpath, (arguments, env_path)))
926
927 def test_internal_execvpe_str(self):
928 self._test_internal_execvpe(str)
929 if os.name != "nt":
930 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000931
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000932
Thomas Wouters477c8d52006-05-27 19:21:47 +0000933class Win32ErrorTests(unittest.TestCase):
934 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000935 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000936
937 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000938 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000939
940 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000941 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000942
943 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000944 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000945 try:
946 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
947 finally:
948 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000949 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000950
951 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000952 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000953
Thomas Wouters477c8d52006-05-27 19:21:47 +0000954 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000955 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000956
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000957class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000958 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000959 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
960 #singles.append("close")
961 #We omit close because it doesn'r raise an exception on some platforms
962 def get_single(f):
963 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000964 if hasattr(os, f):
965 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000966 return helper
967 for f in singles:
968 locals()["test_"+f] = get_single(f)
969
Benjamin Peterson7522c742009-01-19 21:00:09 +0000970 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000971 try:
972 f(support.make_bad_fd(), *args)
973 except OSError as e:
974 self.assertEqual(e.errno, errno.EBADF)
975 else:
976 self.fail("%r didn't raise a OSError with a bad file descriptor"
977 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000978
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000979 def test_isatty(self):
980 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000981 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000982
983 def test_closerange(self):
984 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000985 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000986 # Make sure none of the descriptors we are about to close are
987 # currently valid (issue 6542).
988 for i in range(10):
989 try: os.fstat(fd+i)
990 except OSError:
991 pass
992 else:
993 break
994 if i < 2:
995 raise unittest.SkipTest(
996 "Unable to acquire a range of invalid file descriptors")
997 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000998
999 def test_dup2(self):
1000 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001001 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001002
1003 def test_fchmod(self):
1004 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001005 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001006
1007 def test_fchown(self):
1008 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001009 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001010
1011 def test_fpathconf(self):
1012 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001013 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001014
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001015 def test_ftruncate(self):
1016 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001017 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001018
1019 def test_lseek(self):
1020 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001021 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001022
1023 def test_read(self):
1024 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001025 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001026
1027 def test_tcsetpgrpt(self):
1028 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001029 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001030
1031 def test_write(self):
1032 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001033 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001034
Brian Curtin1b9df392010-11-24 20:24:31 +00001035
1036class LinkTests(unittest.TestCase):
1037 def setUp(self):
1038 self.file1 = support.TESTFN
1039 self.file2 = os.path.join(support.TESTFN + "2")
1040
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001041 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001042 for file in (self.file1, self.file2):
1043 if os.path.exists(file):
1044 os.unlink(file)
1045
Brian Curtin1b9df392010-11-24 20:24:31 +00001046 def _test_link(self, file1, file2):
1047 with open(file1, "w") as f1:
1048 f1.write("test")
1049
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001050 with warnings.catch_warnings():
1051 warnings.simplefilter("ignore", DeprecationWarning)
1052 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001053 with open(file1, "r") as f1, open(file2, "r") as f2:
1054 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1055
1056 def test_link(self):
1057 self._test_link(self.file1, self.file2)
1058
1059 def test_link_bytes(self):
1060 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1061 bytes(self.file2, sys.getfilesystemencoding()))
1062
Brian Curtinf498b752010-11-30 15:54:04 +00001063 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001064 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001065 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001066 except UnicodeError:
1067 raise unittest.SkipTest("Unable to encode for this platform.")
1068
Brian Curtinf498b752010-11-30 15:54:04 +00001069 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001070 self.file2 = self.file1 + "2"
1071 self._test_link(self.file1, self.file2)
1072
Thomas Wouters477c8d52006-05-27 19:21:47 +00001073if sys.platform != 'win32':
1074 class Win32ErrorTests(unittest.TestCase):
1075 pass
1076
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001077 class PosixUidGidTests(unittest.TestCase):
1078 if hasattr(os, 'setuid'):
1079 def test_setuid(self):
1080 if os.getuid() != 0:
1081 self.assertRaises(os.error, os.setuid, 0)
1082 self.assertRaises(OverflowError, os.setuid, 1<<32)
1083
1084 if hasattr(os, 'setgid'):
1085 def test_setgid(self):
1086 if os.getuid() != 0:
1087 self.assertRaises(os.error, os.setgid, 0)
1088 self.assertRaises(OverflowError, os.setgid, 1<<32)
1089
1090 if hasattr(os, 'seteuid'):
1091 def test_seteuid(self):
1092 if os.getuid() != 0:
1093 self.assertRaises(os.error, os.seteuid, 0)
1094 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1095
1096 if hasattr(os, 'setegid'):
1097 def test_setegid(self):
1098 if os.getuid() != 0:
1099 self.assertRaises(os.error, os.setegid, 0)
1100 self.assertRaises(OverflowError, os.setegid, 1<<32)
1101
1102 if hasattr(os, 'setreuid'):
1103 def test_setreuid(self):
1104 if os.getuid() != 0:
1105 self.assertRaises(os.error, os.setreuid, 0, 0)
1106 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1107 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001108
1109 def test_setreuid_neg1(self):
1110 # Needs to accept -1. We run this in a subprocess to avoid
1111 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001112 subprocess.check_call([
1113 sys.executable, '-c',
1114 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001115
1116 if hasattr(os, 'setregid'):
1117 def test_setregid(self):
1118 if os.getuid() != 0:
1119 self.assertRaises(os.error, os.setregid, 0, 0)
1120 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1121 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001122
1123 def test_setregid_neg1(self):
1124 # Needs to accept -1. We run this in a subprocess to avoid
1125 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001126 subprocess.check_call([
1127 sys.executable, '-c',
1128 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001129
1130 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001131 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001132 if support.TESTFN_UNENCODABLE:
1133 self.dir = support.TESTFN_UNENCODABLE
1134 else:
1135 self.dir = support.TESTFN
1136 self.bdir = os.fsencode(self.dir)
1137
1138 bytesfn = []
1139 def add_filename(fn):
1140 try:
1141 fn = os.fsencode(fn)
1142 except UnicodeEncodeError:
1143 return
1144 bytesfn.append(fn)
1145 add_filename(support.TESTFN_UNICODE)
1146 if support.TESTFN_UNENCODABLE:
1147 add_filename(support.TESTFN_UNENCODABLE)
1148 if not bytesfn:
1149 self.skipTest("couldn't create any non-ascii filename")
1150
1151 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001152 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001153 try:
1154 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001155 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001156 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001157 if fn in self.unicodefn:
1158 raise ValueError("duplicate filename")
1159 self.unicodefn.add(fn)
1160 except:
1161 shutil.rmtree(self.dir)
1162 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001163
1164 def tearDown(self):
1165 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001166
1167 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001168 expected = self.unicodefn
1169 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001170 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001171
1172 def test_open(self):
1173 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001174 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001175 f.close()
1176
1177 def test_stat(self):
1178 for fn in self.unicodefn:
1179 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001180else:
1181 class PosixUidGidTests(unittest.TestCase):
1182 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001183 class Pep383Tests(unittest.TestCase):
1184 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001185
Brian Curtineb24d742010-04-12 17:16:38 +00001186@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1187class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001188 def _kill(self, sig):
1189 # Start sys.executable as a subprocess and communicate from the
1190 # subprocess to the parent that the interpreter is ready. When it
1191 # becomes ready, send *sig* via os.kill to the subprocess and check
1192 # that the return code is equal to *sig*.
1193 import ctypes
1194 from ctypes import wintypes
1195 import msvcrt
1196
1197 # Since we can't access the contents of the process' stdout until the
1198 # process has exited, use PeekNamedPipe to see what's inside stdout
1199 # without waiting. This is done so we can tell that the interpreter
1200 # is started and running at a point where it could handle a signal.
1201 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1202 PeekNamedPipe.restype = wintypes.BOOL
1203 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1204 ctypes.POINTER(ctypes.c_char), # stdout buf
1205 wintypes.DWORD, # Buffer size
1206 ctypes.POINTER(wintypes.DWORD), # bytes read
1207 ctypes.POINTER(wintypes.DWORD), # bytes avail
1208 ctypes.POINTER(wintypes.DWORD)) # bytes left
1209 msg = "running"
1210 proc = subprocess.Popen([sys.executable, "-c",
1211 "import sys;"
1212 "sys.stdout.write('{}');"
1213 "sys.stdout.flush();"
1214 "input()".format(msg)],
1215 stdout=subprocess.PIPE,
1216 stderr=subprocess.PIPE,
1217 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001218 self.addCleanup(proc.stdout.close)
1219 self.addCleanup(proc.stderr.close)
1220 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001221
1222 count, max = 0, 100
1223 while count < max and proc.poll() is None:
1224 # Create a string buffer to store the result of stdout from the pipe
1225 buf = ctypes.create_string_buffer(len(msg))
1226 # Obtain the text currently in proc.stdout
1227 # Bytes read/avail/left are left as NULL and unused
1228 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1229 buf, ctypes.sizeof(buf), None, None, None)
1230 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1231 if buf.value:
1232 self.assertEqual(msg, buf.value.decode())
1233 break
1234 time.sleep(0.1)
1235 count += 1
1236 else:
1237 self.fail("Did not receive communication from the subprocess")
1238
Brian Curtineb24d742010-04-12 17:16:38 +00001239 os.kill(proc.pid, sig)
1240 self.assertEqual(proc.wait(), sig)
1241
1242 def test_kill_sigterm(self):
1243 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001244 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001245
1246 def test_kill_int(self):
1247 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001248 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001249
1250 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001251 tagname = "test_os_%s" % uuid.uuid1()
1252 m = mmap.mmap(-1, 1, tagname)
1253 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001254 # Run a script which has console control handling enabled.
1255 proc = subprocess.Popen([sys.executable,
1256 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001257 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001258 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1259 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001260 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001261 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001262 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001263 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001264 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001265 count += 1
1266 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001267 # Forcefully kill the process if we weren't able to signal it.
1268 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001269 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001270 os.kill(proc.pid, event)
1271 # proc.send_signal(event) could also be done here.
1272 # Allow time for the signal to be passed and the process to exit.
1273 time.sleep(0.5)
1274 if not proc.poll():
1275 # Forcefully kill the process if we weren't able to signal it.
1276 os.kill(proc.pid, signal.SIGINT)
1277 self.fail("subprocess did not stop on {}".format(name))
1278
1279 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1280 def test_CTRL_C_EVENT(self):
1281 from ctypes import wintypes
1282 import ctypes
1283
1284 # Make a NULL value by creating a pointer with no argument.
1285 NULL = ctypes.POINTER(ctypes.c_int)()
1286 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1287 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1288 wintypes.BOOL)
1289 SetConsoleCtrlHandler.restype = wintypes.BOOL
1290
1291 # Calling this with NULL and FALSE causes the calling process to
1292 # handle CTRL+C, rather than ignore it. This property is inherited
1293 # by subprocesses.
1294 SetConsoleCtrlHandler(NULL, 0)
1295
1296 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1297
1298 def test_CTRL_BREAK_EVENT(self):
1299 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1300
1301
Brian Curtind40e6f72010-07-08 21:39:08 +00001302@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001303@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001304class Win32SymlinkTests(unittest.TestCase):
1305 filelink = 'filelinktest'
1306 filelink_target = os.path.abspath(__file__)
1307 dirlink = 'dirlinktest'
1308 dirlink_target = os.path.dirname(filelink_target)
1309 missing_link = 'missing link'
1310
1311 def setUp(self):
1312 assert os.path.exists(self.dirlink_target)
1313 assert os.path.exists(self.filelink_target)
1314 assert not os.path.exists(self.dirlink)
1315 assert not os.path.exists(self.filelink)
1316 assert not os.path.exists(self.missing_link)
1317
1318 def tearDown(self):
1319 if os.path.exists(self.filelink):
1320 os.remove(self.filelink)
1321 if os.path.exists(self.dirlink):
1322 os.rmdir(self.dirlink)
1323 if os.path.lexists(self.missing_link):
1324 os.remove(self.missing_link)
1325
1326 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001327 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001328 self.assertTrue(os.path.exists(self.dirlink))
1329 self.assertTrue(os.path.isdir(self.dirlink))
1330 self.assertTrue(os.path.islink(self.dirlink))
1331 self.check_stat(self.dirlink, self.dirlink_target)
1332
1333 def test_file_link(self):
1334 os.symlink(self.filelink_target, self.filelink)
1335 self.assertTrue(os.path.exists(self.filelink))
1336 self.assertTrue(os.path.isfile(self.filelink))
1337 self.assertTrue(os.path.islink(self.filelink))
1338 self.check_stat(self.filelink, self.filelink_target)
1339
1340 def _create_missing_dir_link(self):
1341 'Create a "directory" link to a non-existent target'
1342 linkname = self.missing_link
1343 if os.path.lexists(linkname):
1344 os.remove(linkname)
1345 target = r'c:\\target does not exist.29r3c740'
1346 assert not os.path.exists(target)
1347 target_is_dir = True
1348 os.symlink(target, linkname, target_is_dir)
1349
1350 def test_remove_directory_link_to_missing_target(self):
1351 self._create_missing_dir_link()
1352 # For compatibility with Unix, os.remove will check the
1353 # directory status and call RemoveDirectory if the symlink
1354 # was created with target_is_dir==True.
1355 os.remove(self.missing_link)
1356
1357 @unittest.skip("currently fails; consider for improvement")
1358 def test_isdir_on_directory_link_to_missing_target(self):
1359 self._create_missing_dir_link()
1360 # consider having isdir return true for directory links
1361 self.assertTrue(os.path.isdir(self.missing_link))
1362
1363 @unittest.skip("currently fails; consider for improvement")
1364 def test_rmdir_on_directory_link_to_missing_target(self):
1365 self._create_missing_dir_link()
1366 # consider allowing rmdir to remove directory links
1367 os.rmdir(self.missing_link)
1368
1369 def check_stat(self, link, target):
1370 self.assertEqual(os.stat(link), os.stat(target))
1371 self.assertNotEqual(os.lstat(link), os.stat(link))
1372
Brian Curtind25aef52011-06-13 15:16:04 -05001373 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001374 with warnings.catch_warnings():
1375 warnings.simplefilter("ignore", DeprecationWarning)
1376 self.assertEqual(os.stat(bytes_link), os.stat(target))
1377 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001378
1379 def test_12084(self):
1380 level1 = os.path.abspath(support.TESTFN)
1381 level2 = os.path.join(level1, "level2")
1382 level3 = os.path.join(level2, "level3")
1383 try:
1384 os.mkdir(level1)
1385 os.mkdir(level2)
1386 os.mkdir(level3)
1387
1388 file1 = os.path.abspath(os.path.join(level1, "file1"))
1389
1390 with open(file1, "w") as f:
1391 f.write("file1")
1392
1393 orig_dir = os.getcwd()
1394 try:
1395 os.chdir(level2)
1396 link = os.path.join(level2, "link")
1397 os.symlink(os.path.relpath(file1), "link")
1398 self.assertIn("link", os.listdir(os.getcwd()))
1399
1400 # Check os.stat calls from the same dir as the link
1401 self.assertEqual(os.stat(file1), os.stat("link"))
1402
1403 # Check os.stat calls from a dir below the link
1404 os.chdir(level1)
1405 self.assertEqual(os.stat(file1),
1406 os.stat(os.path.relpath(link)))
1407
1408 # Check os.stat calls from a dir above the link
1409 os.chdir(level3)
1410 self.assertEqual(os.stat(file1),
1411 os.stat(os.path.relpath(link)))
1412 finally:
1413 os.chdir(orig_dir)
1414 except OSError as err:
1415 self.fail(err)
1416 finally:
1417 os.remove(file1)
1418 shutil.rmtree(level1)
1419
Brian Curtind40e6f72010-07-08 21:39:08 +00001420
Victor Stinnere8d51452010-08-19 01:05:19 +00001421class FSEncodingTests(unittest.TestCase):
1422 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001423 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1424 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001425
Victor Stinnere8d51452010-08-19 01:05:19 +00001426 def test_identity(self):
1427 # assert fsdecode(fsencode(x)) == x
1428 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1429 try:
1430 bytesfn = os.fsencode(fn)
1431 except UnicodeEncodeError:
1432 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001433 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001434
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001435
Brett Cannonefb00c02012-02-29 18:31:31 -05001436
1437class DeviceEncodingTests(unittest.TestCase):
1438
1439 def test_bad_fd(self):
1440 # Return None when an fd doesn't actually exist.
1441 self.assertIsNone(os.device_encoding(123456))
1442
Philip Jenveye308b7c2012-02-29 16:16:15 -08001443 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1444 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001445 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001446 def test_device_encoding(self):
1447 encoding = os.device_encoding(0)
1448 self.assertIsNotNone(encoding)
1449 self.assertTrue(codecs.lookup(encoding))
1450
1451
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001452class PidTests(unittest.TestCase):
1453 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1454 def test_getppid(self):
1455 p = subprocess.Popen([sys.executable, '-c',
1456 'import os; print(os.getppid())'],
1457 stdout=subprocess.PIPE)
1458 stdout, _ = p.communicate()
1459 # We are the parent of our subprocess
1460 self.assertEqual(int(stdout), os.getpid())
1461
1462
Brian Curtin0151b8e2010-09-24 13:43:43 +00001463# The introduction of this TestCase caused at least two different errors on
1464# *nix buildbots. Temporarily skip this to let the buildbots move along.
1465@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001466@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1467class LoginTests(unittest.TestCase):
1468 def test_getlogin(self):
1469 user_name = os.getlogin()
1470 self.assertNotEqual(len(user_name), 0)
1471
1472
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001473@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1474 "needs os.getpriority and os.setpriority")
1475class ProgramPriorityTests(unittest.TestCase):
1476 """Tests for os.getpriority() and os.setpriority()."""
1477
1478 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001479
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001480 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1481 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1482 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001483 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1484 if base >= 19 and new_prio <= 19:
1485 raise unittest.SkipTest(
1486 "unable to reliably test setpriority at current nice level of %s" % base)
1487 else:
1488 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001489 finally:
1490 try:
1491 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1492 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001493 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001494 raise
1495
1496
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001497if threading is not None:
1498 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001499
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001500 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001501
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001502 def __init__(self, conn):
1503 asynchat.async_chat.__init__(self, conn)
1504 self.in_buffer = []
1505 self.closed = False
1506 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001507
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001508 def handle_read(self):
1509 data = self.recv(4096)
1510 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001511
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001512 def get_data(self):
1513 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001514
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001515 def handle_close(self):
1516 self.close()
1517 self.closed = True
1518
1519 def handle_error(self):
1520 raise
1521
1522 def __init__(self, address):
1523 threading.Thread.__init__(self)
1524 asyncore.dispatcher.__init__(self)
1525 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1526 self.bind(address)
1527 self.listen(5)
1528 self.host, self.port = self.socket.getsockname()[:2]
1529 self.handler_instance = None
1530 self._active = False
1531 self._active_lock = threading.Lock()
1532
1533 # --- public API
1534
1535 @property
1536 def running(self):
1537 return self._active
1538
1539 def start(self):
1540 assert not self.running
1541 self.__flag = threading.Event()
1542 threading.Thread.start(self)
1543 self.__flag.wait()
1544
1545 def stop(self):
1546 assert self.running
1547 self._active = False
1548 self.join()
1549
1550 def wait(self):
1551 # wait for handler connection to be closed, then stop the server
1552 while not getattr(self.handler_instance, "closed", False):
1553 time.sleep(0.001)
1554 self.stop()
1555
1556 # --- internals
1557
1558 def run(self):
1559 self._active = True
1560 self.__flag.set()
1561 while self._active and asyncore.socket_map:
1562 self._active_lock.acquire()
1563 asyncore.loop(timeout=0.001, count=1)
1564 self._active_lock.release()
1565 asyncore.close_all()
1566
1567 def handle_accept(self):
1568 conn, addr = self.accept()
1569 self.handler_instance = self.Handler(conn)
1570
1571 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001572 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001573 handle_read = handle_connect
1574
1575 def writable(self):
1576 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001577
1578 def handle_error(self):
1579 raise
1580
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001581
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001582@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001583@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1584class TestSendfile(unittest.TestCase):
1585
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001586 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001587 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001588 not sys.platform.startswith("solaris") and \
1589 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001590
1591 @classmethod
1592 def setUpClass(cls):
1593 with open(support.TESTFN, "wb") as f:
1594 f.write(cls.DATA)
1595
1596 @classmethod
1597 def tearDownClass(cls):
1598 support.unlink(support.TESTFN)
1599
1600 def setUp(self):
1601 self.server = SendfileTestServer((support.HOST, 0))
1602 self.server.start()
1603 self.client = socket.socket()
1604 self.client.connect((self.server.host, self.server.port))
1605 self.client.settimeout(1)
1606 # synchronize by waiting for "220 ready" response
1607 self.client.recv(1024)
1608 self.sockno = self.client.fileno()
1609 self.file = open(support.TESTFN, 'rb')
1610 self.fileno = self.file.fileno()
1611
1612 def tearDown(self):
1613 self.file.close()
1614 self.client.close()
1615 if self.server.running:
1616 self.server.stop()
1617
1618 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1619 """A higher level wrapper representing how an application is
1620 supposed to use sendfile().
1621 """
1622 while 1:
1623 try:
1624 if self.SUPPORT_HEADERS_TRAILERS:
1625 return os.sendfile(sock, file, offset, nbytes, headers,
1626 trailers)
1627 else:
1628 return os.sendfile(sock, file, offset, nbytes)
1629 except OSError as err:
1630 if err.errno == errno.ECONNRESET:
1631 # disconnected
1632 raise
1633 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1634 # we have to retry send data
1635 continue
1636 else:
1637 raise
1638
1639 def test_send_whole_file(self):
1640 # normal send
1641 total_sent = 0
1642 offset = 0
1643 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001644 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001645 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1646 if sent == 0:
1647 break
1648 offset += sent
1649 total_sent += sent
1650 self.assertTrue(sent <= nbytes)
1651 self.assertEqual(offset, total_sent)
1652
1653 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001654 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001655 self.client.close()
1656 self.server.wait()
1657 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001658 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001659 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001660
1661 def test_send_at_certain_offset(self):
1662 # start sending a file at a certain offset
1663 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001664 offset = len(self.DATA) // 2
1665 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001666 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001667 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001668 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1669 if sent == 0:
1670 break
1671 offset += sent
1672 total_sent += sent
1673 self.assertTrue(sent <= nbytes)
1674
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001675 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001676 self.client.close()
1677 self.server.wait()
1678 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001679 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001680 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001681 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001682 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001683
1684 def test_offset_overflow(self):
1685 # specify an offset > file size
1686 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001687 try:
1688 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1689 except OSError as e:
1690 # Solaris can raise EINVAL if offset >= file length, ignore.
1691 if e.errno != errno.EINVAL:
1692 raise
1693 else:
1694 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001695 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001696 self.client.close()
1697 self.server.wait()
1698 data = self.server.handler_instance.get_data()
1699 self.assertEqual(data, b'')
1700
1701 def test_invalid_offset(self):
1702 with self.assertRaises(OSError) as cm:
1703 os.sendfile(self.sockno, self.fileno, -1, 4096)
1704 self.assertEqual(cm.exception.errno, errno.EINVAL)
1705
1706 # --- headers / trailers tests
1707
1708 if SUPPORT_HEADERS_TRAILERS:
1709
1710 def test_headers(self):
1711 total_sent = 0
1712 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1713 headers=[b"x" * 512])
1714 total_sent += sent
1715 offset = 4096
1716 nbytes = 4096
1717 while 1:
1718 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1719 offset, nbytes)
1720 if sent == 0:
1721 break
1722 total_sent += sent
1723 offset += sent
1724
1725 expected_data = b"x" * 512 + self.DATA
1726 self.assertEqual(total_sent, len(expected_data))
1727 self.client.close()
1728 self.server.wait()
1729 data = self.server.handler_instance.get_data()
1730 self.assertEqual(hash(data), hash(expected_data))
1731
1732 def test_trailers(self):
1733 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001734 with open(TESTFN2, 'wb') as f:
1735 f.write(b"abcde")
1736 with open(TESTFN2, 'rb')as f:
1737 self.addCleanup(os.remove, TESTFN2)
1738 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1739 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001740 self.client.close()
1741 self.server.wait()
1742 data = self.server.handler_instance.get_data()
1743 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001744
1745 if hasattr(os, "SF_NODISKIO"):
1746 def test_flags(self):
1747 try:
1748 os.sendfile(self.sockno, self.fileno, 0, 4096,
1749 flags=os.SF_NODISKIO)
1750 except OSError as err:
1751 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1752 raise
1753
1754
Benjamin Peterson799bd802011-08-31 22:15:17 -04001755def supports_extended_attributes():
1756 if not hasattr(os, "setxattr"):
1757 return False
1758 try:
1759 with open(support.TESTFN, "wb") as fp:
1760 try:
1761 os.fsetxattr(fp.fileno(), b"user.test", b"")
Benjamin Peterson4ca56612012-03-18 22:26:05 -04001762 except OSError:
Benjamin Peterson799bd802011-08-31 22:15:17 -04001763 return False
1764 finally:
1765 support.unlink(support.TESTFN)
1766 # Kernels < 2.6.39 don't respect setxattr flags.
1767 kernel_version = platform.release()
1768 m = re.match("2.6.(\d{1,2})", kernel_version)
1769 return m is None or int(m.group(1)) >= 39
1770
1771
1772@unittest.skipUnless(supports_extended_attributes(),
1773 "no non-broken extended attribute support")
1774class ExtendedAttributeTests(unittest.TestCase):
1775
1776 def tearDown(self):
1777 support.unlink(support.TESTFN)
1778
1779 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1780 fn = support.TESTFN
1781 open(fn, "wb").close()
1782 with self.assertRaises(OSError) as cm:
1783 getxattr(fn, s("user.test"))
1784 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001785 init_xattr = listxattr(fn)
1786 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001787 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001788 xattr = set(init_xattr)
1789 xattr.add("user.test")
1790 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001791 self.assertEqual(getxattr(fn, b"user.test"), b"")
1792 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1793 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1794 with self.assertRaises(OSError) as cm:
1795 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1796 self.assertEqual(cm.exception.errno, errno.EEXIST)
1797 with self.assertRaises(OSError) as cm:
1798 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1799 self.assertEqual(cm.exception.errno, errno.ENODATA)
1800 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001801 xattr.add("user.test2")
1802 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001803 removexattr(fn, s("user.test"))
1804 with self.assertRaises(OSError) as cm:
1805 getxattr(fn, s("user.test"))
1806 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001807 xattr.remove("user.test")
1808 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001809 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1810 setxattr(fn, s("user.test"), b"a"*1024)
1811 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1812 removexattr(fn, s("user.test"))
1813 many = sorted("user.test{}".format(i) for i in range(100))
1814 for thing in many:
1815 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001816 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001817
1818 def _check_xattrs(self, *args):
1819 def make_bytes(s):
1820 return bytes(s, "ascii")
1821 self._check_xattrs_str(str, *args)
1822 support.unlink(support.TESTFN)
1823 self._check_xattrs_str(make_bytes, *args)
1824
1825 def test_simple(self):
1826 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1827 os.listxattr)
1828
1829 def test_lpath(self):
1830 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1831 os.llistxattr)
1832
1833 def test_fds(self):
1834 def getxattr(path, *args):
1835 with open(path, "rb") as fp:
1836 return os.fgetxattr(fp.fileno(), *args)
1837 def setxattr(path, *args):
1838 with open(path, "wb") as fp:
1839 os.fsetxattr(fp.fileno(), *args)
1840 def removexattr(path, *args):
1841 with open(path, "wb") as fp:
1842 os.fremovexattr(fp.fileno(), *args)
1843 def listxattr(path, *args):
1844 with open(path, "rb") as fp:
1845 return os.flistxattr(fp.fileno(), *args)
1846 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1847
1848
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001849@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1850class Win32DeprecatedBytesAPI(unittest.TestCase):
1851 def test_deprecated(self):
1852 import nt
1853 filename = os.fsencode(support.TESTFN)
1854 with warnings.catch_warnings():
1855 warnings.simplefilter("error", DeprecationWarning)
1856 for func, *args in (
1857 (nt._getfullpathname, filename),
1858 (nt._isdir, filename),
1859 (os.access, filename, os.R_OK),
1860 (os.chdir, filename),
1861 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001862 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001863 (os.link, filename, filename),
1864 (os.listdir, filename),
1865 (os.lstat, filename),
1866 (os.mkdir, filename),
1867 (os.open, filename, os.O_RDONLY),
1868 (os.rename, filename, filename),
1869 (os.rmdir, filename),
1870 (os.startfile, filename),
1871 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001872 (os.unlink, filename),
1873 (os.utime, filename),
1874 ):
1875 self.assertRaises(DeprecationWarning, func, *args)
1876
Victor Stinner28216442011-11-16 00:34:44 +01001877 @support.skip_unless_symlink
1878 def test_symlink(self):
1879 filename = os.fsencode(support.TESTFN)
1880 with warnings.catch_warnings():
1881 warnings.simplefilter("error", DeprecationWarning)
1882 self.assertRaises(DeprecationWarning,
1883 os.symlink, filename, filename)
1884
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001885
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001886@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
1887class TermsizeTests(unittest.TestCase):
1888 def test_does_not_crash(self):
1889 """Check if get_terminal_size() returns a meaningful value.
1890
1891 There's no easy portable way to actually check the size of the
1892 terminal, so let's check if it returns something sensible instead.
1893 """
1894 try:
1895 size = os.get_terminal_size()
1896 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001897 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001898 # Under win32 a generic OSError can be thrown if the
1899 # handle cannot be retrieved
1900 self.skipTest("failed to query terminal size")
1901 raise
1902
Antoine Pitroucfade362012-02-08 23:48:59 +01001903 self.assertGreaterEqual(size.columns, 0)
1904 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001905
1906 def test_stty_match(self):
1907 """Check if stty returns the same results
1908
1909 stty actually tests stdin, so get_terminal_size is invoked on
1910 stdin explicitly. If stty succeeded, then get_terminal_size()
1911 should work too.
1912 """
1913 try:
1914 size = subprocess.check_output(['stty', 'size']).decode().split()
1915 except (FileNotFoundError, subprocess.CalledProcessError):
1916 self.skipTest("stty invocation failed")
1917 expected = (int(size[1]), int(size[0])) # reversed order
1918
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001919 try:
1920 actual = os.get_terminal_size(sys.__stdin__.fileno())
1921 except OSError as e:
1922 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
1923 # Under win32 a generic OSError can be thrown if the
1924 # handle cannot be retrieved
1925 self.skipTest("failed to query terminal size")
1926 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001927 self.assertEqual(expected, actual)
1928
1929
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001930@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001931def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001932 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001933 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001934 StatAttributeTests,
1935 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001936 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01001937 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001938 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001939 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001940 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001941 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001942 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001943 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001944 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001945 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001946 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001947 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001948 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05001949 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001950 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001951 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001952 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001953 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001954 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001955 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001956 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001957 TermsizeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001958 )
Fred Drake2e2be372001-09-20 21:33:42 +00001959
1960if __name__ == "__main__":
1961 test_main()