blob: da5a130d6c6851b154b42d140a958c119d71c61e [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner138adb82015-06-12 22:01:54 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner138adb82015-06-12 22:01:54 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner138adb82015-06-12 22:01:54 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import platform
19import re
Victor Stinner138adb82015-06-12 22:01:54 +020020import shutil
21import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000022import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import stat
Victor Stinner138adb82015-06-12 22:01:54 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinner138adb82015-06-12 22:01:54 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000032try:
33 import threading
34except ImportError:
35 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import grp
50 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
51 if hasattr(os, 'getgid'):
52 process_gid = os.getgid()
53 if process_gid not in groups:
54 groups.append(process_gid)
55except ImportError:
56 groups = []
57try:
58 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
60except ImportError:
61 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Berker Peksagce643912015-05-06 06:33:17 +030067from test.support.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Thomas Wouters0e3f5912006-08-11 14:57:12 +000085# Tests creating TESTFN
86class FileTests(unittest.TestCase):
87 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000088 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000089 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000090 tearDown = setUp
91
92 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000093 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000095 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096
Christian Heimesfdab48e2008-01-20 09:06:41 +000097 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000098 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
99 # We must allocate two consecutive file descriptors, otherwise
100 # it will mess up other file descriptors (perhaps even the three
101 # standard ones).
102 second = os.dup(first)
103 try:
104 retries = 0
105 while second != first + 1:
106 os.close(first)
107 retries += 1
108 if retries > 10:
109 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000110 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000111 first, second = second, os.dup(second)
112 finally:
113 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000114 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000115 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000116 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000118 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000119 def test_rename(self):
120 path = support.TESTFN
121 old = sys.getrefcount(path)
122 self.assertRaises(TypeError, os.rename, path, 0)
123 new = sys.getrefcount(path)
124 self.assertEqual(old, new)
125
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000126 def test_read(self):
127 with open(support.TESTFN, "w+b") as fobj:
128 fobj.write(b"spam")
129 fobj.flush()
130 fd = fobj.fileno()
131 os.lseek(fd, 0, 0)
132 s = os.read(fd, 4)
133 self.assertEqual(type(s), bytes)
134 self.assertEqual(s, b"spam")
135
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200136 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200137 # Skip the test on 32-bit platforms: the number of bytes must fit in a
138 # Py_ssize_t type
139 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
140 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200141 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
142 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200143 with open(support.TESTFN, "wb") as fp:
144 fp.write(b'test')
145 self.addCleanup(support.unlink, support.TESTFN)
146
147 # Issue #21932: Make sure that os.read() does not raise an
148 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200149 with open(support.TESTFN, "rb") as fp:
150 data = os.read(fp.fileno(), size)
151
152 # The test does not try to read more than 2 GB at once because the
153 # operating system is free to return less bytes than requested.
154 self.assertEqual(data, b'test')
155
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000156 def test_write(self):
157 # os.write() accepts bytes- and buffer-like objects but not strings
158 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
159 self.assertRaises(TypeError, os.write, fd, "beans")
160 os.write(fd, b"bacon\n")
161 os.write(fd, bytearray(b"eggs\n"))
162 os.write(fd, memoryview(b"spam\n"))
163 os.close(fd)
164 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000165 self.assertEqual(fobj.read().splitlines(),
166 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000167
Victor Stinnere0daff12011-03-20 23:36:35 +0100168 def write_windows_console(self, *args):
169 retcode = subprocess.call(args,
170 # use a new console to not flood the test output
171 creationflags=subprocess.CREATE_NEW_CONSOLE,
172 # use a shell to hide the console window (SW_HIDE)
173 shell=True)
174 self.assertEqual(retcode, 0)
175
176 @unittest.skipUnless(sys.platform == 'win32',
177 'test specific to the Windows console')
178 def test_write_windows_console(self):
179 # Issue #11395: the Windows console returns an error (12: not enough
180 # space error) on writing into stdout if stdout mode is binary and the
181 # length is greater than 66,000 bytes (or less, depending on heap
182 # usage).
183 code = "print('x' * 100000)"
184 self.write_windows_console(sys.executable, "-c", code)
185 self.write_windows_console(sys.executable, "-u", "-c", code)
186
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000187 def fdopen_helper(self, *args):
188 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200189 f = os.fdopen(fd, *args)
190 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000191
192 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200193 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
194 os.close(fd)
195
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000196 self.fdopen_helper()
197 self.fdopen_helper('r')
198 self.fdopen_helper('r', 100)
199
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100200 def test_replace(self):
201 TESTFN2 = support.TESTFN + ".2"
202 with open(support.TESTFN, 'w') as f:
203 f.write("1")
204 with open(TESTFN2, 'w') as f:
205 f.write("2")
206 self.addCleanup(os.unlink, TESTFN2)
207 os.replace(support.TESTFN, TESTFN2)
208 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
209 with open(TESTFN2, 'r') as f:
210 self.assertEqual(f.read(), "1")
211
Martin Panterbf19d162015-09-09 01:01:13 +0000212 def test_open_keywords(self):
213 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
214 dir_fd=None)
215 os.close(f)
216
217 def test_symlink_keywords(self):
218 symlink = support.get_attribute(os, "symlink")
219 try:
220 symlink(src='target', dst=support.TESTFN,
221 target_is_directory=False, dir_fd=None)
222 except (NotImplementedError, OSError):
223 pass # No OS support or unprivileged user
224
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200225
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000226# Test attributes on return values from os.*stat* family.
227class StatAttributeTests(unittest.TestCase):
228 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 os.mkdir(support.TESTFN)
230 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000231 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000232 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000234
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235 def tearDown(self):
236 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000237 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000238
Serhiy Storchaka43767632013-11-03 21:31:38 +0200239 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000240 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000241 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242
243 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(result[stat.ST_SIZE], 3)
245 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247 # Make sure all the attributes are there
248 members = dir(result)
249 for name in dir(stat):
250 if name[:3] == 'ST_':
251 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000252 if name.endswith("TIME"):
253 def trunc(x): return int(x)
254 else:
255 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259
Larry Hastings6fe20b32012-04-19 15:07:49 -0700260 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700261 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700262 for name in 'st_atime st_mtime st_ctime'.split():
263 floaty = int(getattr(result, name) * 100000)
264 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700265 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700266
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267 try:
268 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200269 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270 except IndexError:
271 pass
272
273 # Make sure that assignment fails
274 try:
275 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200276 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000277 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278 pass
279
280 try:
281 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200282 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000283 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 pass
285
286 try:
287 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200288 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000289 except AttributeError:
290 pass
291
292 # Use the stat_result constructor with a too-short tuple.
293 try:
294 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200295 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000296 except TypeError:
297 pass
298
Ezio Melotti42da6632011-03-15 05:18:48 +0200299 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000300 try:
301 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
302 except TypeError:
303 pass
304
Antoine Pitrou38425292010-09-21 18:19:07 +0000305 def test_stat_attributes(self):
306 self.check_stat_attributes(self.fname)
307
308 def test_stat_attributes_bytes(self):
309 try:
310 fname = self.fname.encode(sys.getfilesystemencoding())
311 except UnicodeEncodeError:
312 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100313 with warnings.catch_warnings():
314 warnings.simplefilter("ignore", DeprecationWarning)
315 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000316
Christian Heimes25827622013-10-12 01:27:08 +0200317 def test_stat_result_pickle(self):
318 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200319 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
320 p = pickle.dumps(result, proto)
321 self.assertIn(b'stat_result', p)
322 if proto < 4:
323 self.assertIn(b'cos\nstat_result\n', p)
324 unpickled = pickle.loads(p)
325 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000328 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000329 try:
330 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000331 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000332 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000333 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200334 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000335
336 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000337 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000338
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000339 # Make sure all the attributes are there.
340 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
341 'ffree', 'favail', 'flag', 'namemax')
342 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000343 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344
345 # Make sure that assignment really fails
346 try:
347 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200348 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000349 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 pass
351
352 try:
353 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200354 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000355 except AttributeError:
356 pass
357
358 # Use the constructor with a too-short tuple.
359 try:
360 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200361 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362 except TypeError:
363 pass
364
Ezio Melotti42da6632011-03-15 05:18:48 +0200365 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 try:
367 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
368 except TypeError:
369 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000370
Christian Heimes25827622013-10-12 01:27:08 +0200371 @unittest.skipUnless(hasattr(os, 'statvfs'),
372 "need os.statvfs()")
373 def test_statvfs_result_pickle(self):
374 try:
375 result = os.statvfs(self.fname)
376 except OSError as e:
377 # On AtheOS, glibc always returns ENOSYS
378 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200379 self.skipTest('os.statvfs() failed with ENOSYS')
380
Serhiy Storchakabad12572014-12-15 14:03:42 +0200381 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
382 p = pickle.dumps(result, proto)
383 self.assertIn(b'statvfs_result', p)
384 if proto < 4:
385 self.assertIn(b'cos\nstatvfs_result\n', p)
386 unpickled = pickle.loads(p)
387 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200388
Serhiy Storchaka43767632013-11-03 21:31:38 +0200389 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
390 def test_1686475(self):
391 # Verify that an open file can be stat'ed
392 try:
393 os.stat(r"c:\pagefile.sys")
394 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600395 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200396 except OSError as e:
397 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000398
Serhiy Storchaka43767632013-11-03 21:31:38 +0200399 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
400 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
401 def test_15261(self):
402 # Verify that stat'ing a closed fd does not cause crash
403 r, w = os.pipe()
404 try:
405 os.stat(r) # should not raise error
406 finally:
407 os.close(r)
408 os.close(w)
409 with self.assertRaises(OSError) as ctx:
410 os.stat(r)
411 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100412
Zachary Ware63f277b2014-06-19 09:46:37 -0500413 def check_file_attributes(self, result):
414 self.assertTrue(hasattr(result, 'st_file_attributes'))
415 self.assertTrue(isinstance(result.st_file_attributes, int))
416 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
417
418 @unittest.skipUnless(sys.platform == "win32",
419 "st_file_attributes is Win32 specific")
420 def test_file_attributes(self):
421 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
422 result = os.stat(self.fname)
423 self.check_file_attributes(result)
424 self.assertEqual(
425 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
426 0)
427
428 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
429 result = os.stat(support.TESTFN)
430 self.check_file_attributes(result)
431 self.assertEqual(
432 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
433 stat.FILE_ATTRIBUTE_DIRECTORY)
434
Victor Stinner138adb82015-06-12 22:01:54 +0200435
436class UtimeTests(unittest.TestCase):
437 def setUp(self):
438 self.dirname = support.TESTFN
439 self.fname = os.path.join(self.dirname, "f1")
440
441 self.addCleanup(support.rmtree, self.dirname)
442 os.mkdir(self.dirname)
443 with open(self.fname, 'wb') as fp:
444 fp.write(b"ABC")
445
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200446 def restore_float_times(state):
447 with warnings.catch_warnings():
448 warnings.simplefilter("ignore", DeprecationWarning)
449
450 os.stat_float_times(state)
451
Victor Stinner138adb82015-06-12 22:01:54 +0200452 # ensure that st_atime and st_mtime are float
453 with warnings.catch_warnings():
454 warnings.simplefilter("ignore", DeprecationWarning)
455
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200456 old_float_times = os.stat_float_times(-1)
457 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner138adb82015-06-12 22:01:54 +0200458
459 os.stat_float_times(True)
460
461 def support_subsecond(self, filename):
462 # Heuristic to check if the filesystem supports timestamp with
463 # subsecond resolution: check if float and int timestamps are different
464 st = os.stat(filename)
465 return ((st.st_atime != st[7])
466 or (st.st_mtime != st[8])
467 or (st.st_ctime != st[9]))
468
469 def _test_utime(self, set_time, filename=None):
470 if not filename:
471 filename = self.fname
472
473 support_subsecond = self.support_subsecond(filename)
474 if support_subsecond:
475 # Timestamp with a resolution of 1 microsecond (10^-6).
476 #
477 # The resolution of the C internal function used by os.utime()
478 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
479 # test with a resolution of 1 ns requires more work:
480 # see the issue #15745.
481 atime_ns = 1002003000 # 1.002003 seconds
482 mtime_ns = 4005006000 # 4.005006 seconds
483 else:
484 # use a resolution of 1 second
485 atime_ns = 5 * 10**9
486 mtime_ns = 8 * 10**9
487
488 set_time(filename, (atime_ns, mtime_ns))
489 st = os.stat(filename)
490
491 if support_subsecond:
492 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
493 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
494 else:
495 self.assertEqual(st.st_atime, atime_ns * 1e-9)
496 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
497 self.assertEqual(st.st_atime_ns, atime_ns)
498 self.assertEqual(st.st_mtime_ns, mtime_ns)
499
500 def test_utime(self):
501 def set_time(filename, ns):
502 # test the ns keyword parameter
503 os.utime(filename, ns=ns)
504 self._test_utime(set_time)
505
506 @staticmethod
507 def ns_to_sec(ns):
508 # Convert a number of nanosecond (int) to a number of seconds (float).
509 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
510 # issue, os.utime() rounds towards minus infinity.
511 return (ns * 1e-9) + 0.5e-9
512
513 def test_utime_by_indexed(self):
514 # pass times as floating point seconds as the second indexed parameter
515 def set_time(filename, ns):
516 atime_ns, mtime_ns = ns
517 atime = self.ns_to_sec(atime_ns)
518 mtime = self.ns_to_sec(mtime_ns)
519 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
520 # or utime(time_t)
521 os.utime(filename, (atime, mtime))
522 self._test_utime(set_time)
523
524 def test_utime_by_times(self):
525 def set_time(filename, ns):
526 atime_ns, mtime_ns = ns
527 atime = self.ns_to_sec(atime_ns)
528 mtime = self.ns_to_sec(mtime_ns)
529 # test the times keyword parameter
530 os.utime(filename, times=(atime, mtime))
531 self._test_utime(set_time)
532
533 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
534 "follow_symlinks support for utime required "
535 "for this test.")
536 def test_utime_nofollow_symlinks(self):
537 def set_time(filename, ns):
538 # use follow_symlinks=False to test utimensat(timespec)
539 # or lutimes(timeval)
540 os.utime(filename, ns=ns, follow_symlinks=False)
541 self._test_utime(set_time)
542
543 @unittest.skipUnless(os.utime in os.supports_fd,
544 "fd support for utime required for this test.")
545 def test_utime_fd(self):
546 def set_time(filename, ns):
547 with open(filename, 'wb') as fp:
548 # use a file descriptor to test futimens(timespec)
549 # or futimes(timeval)
550 os.utime(fp.fileno(), ns=ns)
551 self._test_utime(set_time)
552
553 @unittest.skipUnless(os.utime in os.supports_dir_fd,
554 "dir_fd support for utime required for this test.")
555 def test_utime_dir_fd(self):
556 def set_time(filename, ns):
557 dirname, name = os.path.split(filename)
558 dirfd = os.open(dirname, os.O_RDONLY)
559 try:
560 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
561 os.utime(name, dir_fd=dirfd, ns=ns)
562 finally:
563 os.close(dirfd)
564 self._test_utime(set_time)
565
566 def test_utime_directory(self):
567 def set_time(filename, ns):
568 # test calling os.utime() on a directory
569 os.utime(filename, ns=ns)
570 self._test_utime(set_time, filename=self.dirname)
571
572 def _test_utime_current(self, set_time):
573 # Get the system clock
574 current = time.time()
575
576 # Call os.utime() to set the timestamp to the current system clock
577 set_time(self.fname)
578
579 if not self.support_subsecond(self.fname):
580 delta = 1.0
581 else:
582 # On Windows, the usual resolution of time.time() is 15.6 ms
583 delta = 0.020
584 st = os.stat(self.fname)
585 msg = ("st_time=%r, current=%r, dt=%r"
586 % (st.st_mtime, current, st.st_mtime - current))
587 self.assertAlmostEqual(st.st_mtime, current,
588 delta=delta, msg=msg)
589
590 def test_utime_current(self):
591 def set_time(filename):
592 # Set to the current time in the new way
593 os.utime(self.fname)
594 self._test_utime_current(set_time)
595
596 def test_utime_current_old(self):
597 def set_time(filename):
598 # Set to the current time in the old explicit way.
599 os.utime(self.fname, None)
600 self._test_utime_current(set_time)
601
602 def get_file_system(self, path):
603 if sys.platform == 'win32':
604 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
605 import ctypes
606 kernel32 = ctypes.windll.kernel32
607 buf = ctypes.create_unicode_buffer("", 100)
608 ok = kernel32.GetVolumeInformationW(root, None, 0,
609 None, None, None,
610 buf, len(buf))
611 if ok:
612 return buf.value
613 # return None if the filesystem is unknown
614
615 def test_large_time(self):
616 # Many filesystems are limited to the year 2038. At least, the test
617 # pass with NTFS filesystem.
618 if self.get_file_system(self.dirname) != "NTFS":
619 self.skipTest("requires NTFS")
620
621 large = 5000000000 # some day in 2128
622 os.utime(self.fname, (large, large))
623 self.assertEqual(os.stat(self.fname).st_mtime, large)
624
625 def test_utime_invalid_arguments(self):
626 # seconds and nanoseconds parameters are mutually exclusive
627 with self.assertRaises(ValueError):
628 os.utime(self.fname, (5, 5), ns=(5, 5))
629
630
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000631from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000632
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000633class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000634 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000635 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000636
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000637 def setUp(self):
638 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000639 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000640 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000641 for key, value in self._reference().items():
642 os.environ[key] = value
643
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000644 def tearDown(self):
645 os.environ.clear()
646 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000647 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000648 os.environb.clear()
649 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000650
Christian Heimes90333392007-11-01 19:08:42 +0000651 def _reference(self):
652 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
653
654 def _empty_mapping(self):
655 os.environ.clear()
656 return os.environ
657
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000658 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300659 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000660 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000661 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300662 os.environ.update(HELLO="World")
663 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
664 value = popen.read().strip()
665 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000666
Ezio Melottic7e139b2012-09-26 20:01:34 +0300667 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000668 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300669 with os.popen(
670 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
671 it = iter(popen)
672 self.assertEqual(next(it), "line1\n")
673 self.assertEqual(next(it), "line2\n")
674 self.assertEqual(next(it), "line3\n")
675 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000676
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000677 # Verify environ keys and values from the OS are of the
678 # correct str type.
679 def test_keyvalue_types(self):
680 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000681 self.assertEqual(type(key), str)
682 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000683
Christian Heimes90333392007-11-01 19:08:42 +0000684 def test_items(self):
685 for key, value in self._reference().items():
686 self.assertEqual(os.environ.get(key), value)
687
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000688 # Issue 7310
689 def test___repr__(self):
690 """Check that the repr() of os.environ looks like environ({...})."""
691 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000692 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
693 '{!r}: {!r}'.format(key, value)
694 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000695
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000696 def test_get_exec_path(self):
697 defpath_list = os.defpath.split(os.pathsep)
698 test_path = ['/monty', '/python', '', '/flying/circus']
699 test_env = {'PATH': os.pathsep.join(test_path)}
700
701 saved_environ = os.environ
702 try:
703 os.environ = dict(test_env)
704 # Test that defaulting to os.environ works.
705 self.assertSequenceEqual(test_path, os.get_exec_path())
706 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
707 finally:
708 os.environ = saved_environ
709
710 # No PATH environment variable
711 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
712 # Empty PATH environment variable
713 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
714 # Supplied PATH environment variable
715 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
716
Victor Stinnerb745a742010-05-18 17:17:23 +0000717 if os.supports_bytes_environ:
718 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000719 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000720 # ignore BytesWarning warning
721 with warnings.catch_warnings(record=True):
722 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000723 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000724 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000725 pass
726 else:
727 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000728
729 # bytes key and/or value
730 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
731 ['abc'])
732 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
733 ['abc'])
734 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
735 ['abc'])
736
737 @unittest.skipUnless(os.supports_bytes_environ,
738 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000739 def test_environb(self):
740 # os.environ -> os.environb
741 value = 'euro\u20ac'
742 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000743 value_bytes = value.encode(sys.getfilesystemencoding(),
744 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000745 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000746 msg = "U+20AC character is not encodable to %s" % (
747 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000748 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000749 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000750 self.assertEqual(os.environ['unicode'], value)
751 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000752
753 # os.environb -> os.environ
754 value = b'\xff'
755 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000756 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000757 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000758 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000759
Charles-François Natali2966f102011-11-26 11:32:46 +0100760 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
761 # #13415).
762 @support.requires_freebsd_version(7)
763 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100764 def test_unset_error(self):
765 if sys.platform == "win32":
766 # an environment variable is limited to 32,767 characters
767 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100768 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100769 else:
770 # "=" is not allowed in a variable name
771 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100772 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100773
Victor Stinner6d101392013-04-14 16:35:04 +0200774 def test_key_type(self):
775 missing = 'missingkey'
776 self.assertNotIn(missing, os.environ)
777
Victor Stinner839e5ea2013-04-14 16:43:03 +0200778 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200779 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200780 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200781 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200782
Victor Stinner839e5ea2013-04-14 16:43:03 +0200783 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200784 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200785 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200786 self.assertTrue(cm.exception.__suppress_context__)
787
Victor Stinner6d101392013-04-14 16:35:04 +0200788
Tim Petersc4e09402003-04-25 07:11:48 +0000789class WalkTests(unittest.TestCase):
790 """Tests for os.walk()."""
791
Victor Stinner0561c532015-03-12 10:28:24 +0100792 # Wrapper to hide minor differences between os.walk and os.fwalk
793 # to tests both functions with the same code base
794 def walk(self, directory, topdown=True, follow_symlinks=False):
795 walk_it = os.walk(directory,
796 topdown=topdown,
797 followlinks=follow_symlinks)
798 for root, dirs, files in walk_it:
799 yield (root, dirs, files)
800
Charles-François Natali7372b062012-02-05 15:15:38 +0100801 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100802 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000803
804 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000805 # TESTFN/
806 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000807 # tmp1
808 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000809 # tmp2
810 # SUB11/ no kids
811 # SUB2/ a file kid and a dirsymlink kid
812 # tmp3
813 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200814 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000815 # TEST2/
816 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100817 self.walk_path = join(support.TESTFN, "TEST1")
818 self.sub1_path = join(self.walk_path, "SUB1")
819 self.sub11_path = join(self.sub1_path, "SUB11")
820 sub2_path = join(self.walk_path, "SUB2")
821 tmp1_path = join(self.walk_path, "tmp1")
822 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000823 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100824 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000825 t2_path = join(support.TESTFN, "TEST2")
826 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200827 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000828
829 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100830 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000831 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000832 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100833
Guido van Rossumd8faa362007-04-27 19:54:29 +0000834 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000835 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000836 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
837 f.close()
838
Victor Stinner0561c532015-03-12 10:28:24 +0100839 if support.can_symlink():
840 os.symlink(os.path.abspath(t2_path), self.link_path)
841 os.symlink('broken', broken_link_path, True)
842 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
843 else:
844 self.sub2_tree = (sub2_path, [], ["tmp3"])
845
846 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000847 # Walk top-down.
Victor Stinner0561c532015-03-12 10:28:24 +0100848 all = list(os.walk(self.walk_path))
849
Tim Petersc4e09402003-04-25 07:11:48 +0000850 self.assertEqual(len(all), 4)
851 # We can't know which order SUB1 and SUB2 will appear in.
852 # Not flipped: TESTFN, SUB1, SUB11, SUB2
853 # flipped: TESTFN, SUB2, SUB1, SUB11
854 flipped = all[0][1][0] != "SUB1"
855 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200856 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100857 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
858 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
859 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
860 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000861
Victor Stinner0561c532015-03-12 10:28:24 +0100862 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000863 # Prune the search.
864 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100865 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000866 all.append((root, dirs, files))
867 # Don't descend into SUB1.
868 if 'SUB1' in dirs:
869 # Note that this also mutates the dirs we appended to all!
870 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000871
Victor Stinner0561c532015-03-12 10:28:24 +0100872 self.assertEqual(len(all), 2)
873 self.assertEqual(all[0],
874 (self.walk_path, ["SUB2"], ["tmp1"]))
875
876 all[1][-1].sort()
877 self.assertEqual(all[1], self.sub2_tree)
878
879 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000880 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100881 all = list(self.walk(self.walk_path, topdown=False))
882
Tim Petersc4e09402003-04-25 07:11:48 +0000883 self.assertEqual(len(all), 4)
884 # We can't know which order SUB1 and SUB2 will appear in.
885 # Not flipped: SUB11, SUB1, SUB2, TESTFN
886 # flipped: SUB2, SUB11, SUB1, TESTFN
887 flipped = all[3][1][0] != "SUB1"
888 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200889 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100890 self.assertEqual(all[3],
891 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
892 self.assertEqual(all[flipped],
893 (self.sub11_path, [], []))
894 self.assertEqual(all[flipped + 1],
895 (self.sub1_path, ["SUB11"], ["tmp2"]))
896 self.assertEqual(all[2 - 2 * flipped],
897 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000898
Victor Stinner0561c532015-03-12 10:28:24 +0100899 def test_walk_symlink(self):
900 if not support.can_symlink():
901 self.skipTest("need symlink support")
902
903 # Walk, following symlinks.
904 walk_it = self.walk(self.walk_path, follow_symlinks=True)
905 for root, dirs, files in walk_it:
906 if root == self.link_path:
907 self.assertEqual(dirs, [])
908 self.assertEqual(files, ["tmp4"])
909 break
910 else:
911 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000912
913 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000914 # Tear everything down. This is a decent use for bottom-up on
915 # Windows, which doesn't have a recursive delete command. The
916 # (not so) subtlety is that rmdir will fail unless the dir's
917 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000918 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000919 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000920 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000921 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000922 dirname = os.path.join(root, name)
923 if not os.path.islink(dirname):
924 os.rmdir(dirname)
925 else:
926 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000927 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000928
Charles-François Natali7372b062012-02-05 15:15:38 +0100929
930@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
931class FwalkTests(WalkTests):
932 """Tests for os.fwalk()."""
933
Victor Stinner0561c532015-03-12 10:28:24 +0100934 def walk(self, directory, topdown=True, follow_symlinks=False):
935 walk_it = os.fwalk(directory,
936 topdown=topdown,
937 follow_symlinks=follow_symlinks)
938 for root, dirs, files, root_fd in walk_it:
939 yield (root, dirs, files)
940
941
Larry Hastingsc48fe982012-06-25 04:49:05 -0700942 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
943 """
944 compare with walk() results.
945 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700946 walk_kwargs = walk_kwargs.copy()
947 fwalk_kwargs = fwalk_kwargs.copy()
948 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
949 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
950 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700951
Charles-François Natali7372b062012-02-05 15:15:38 +0100952 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700953 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100954 expected[root] = (set(dirs), set(files))
955
Larry Hastingsc48fe982012-06-25 04:49:05 -0700956 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100957 self.assertIn(root, expected)
958 self.assertEqual(expected[root], (set(dirs), set(files)))
959
Larry Hastingsc48fe982012-06-25 04:49:05 -0700960 def test_compare_to_walk(self):
961 kwargs = {'top': support.TESTFN}
962 self._compare_to_walk(kwargs, kwargs)
963
Charles-François Natali7372b062012-02-05 15:15:38 +0100964 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700965 try:
966 fd = os.open(".", os.O_RDONLY)
967 walk_kwargs = {'top': support.TESTFN}
968 fwalk_kwargs = walk_kwargs.copy()
969 fwalk_kwargs['dir_fd'] = fd
970 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
971 finally:
972 os.close(fd)
973
974 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100975 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700976 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
977 args = support.TESTFN, topdown, None
978 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100979 # check that the FD is valid
980 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700981 # redundant check
982 os.stat(rootfd)
983 # check that listdir() returns consistent information
984 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100985
986 def test_fd_leak(self):
987 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
988 # we both check that calling fwalk() a large number of times doesn't
989 # yield EMFILE, and that the minimum allocated FD hasn't changed.
990 minfd = os.dup(1)
991 os.close(minfd)
992 for i in range(256):
993 for x in os.fwalk(support.TESTFN):
994 pass
995 newfd = os.dup(1)
996 self.addCleanup(os.close, newfd)
997 self.assertEqual(newfd, minfd)
998
999 def tearDown(self):
1000 # cleanup
1001 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
1002 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001003 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001004 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001005 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -07001006 if stat.S_ISDIR(st.st_mode):
1007 os.rmdir(name, dir_fd=rootfd)
1008 else:
1009 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001010 os.rmdir(support.TESTFN)
1011
1012
Guido van Rossume7ba4952007-06-06 23:52:48 +00001013class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001014 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001015 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001016
1017 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001018 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001019 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1020 os.makedirs(path) # Should work
1021 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1022 os.makedirs(path)
1023
1024 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001025 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001026 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1027 os.makedirs(path)
1028 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1029 'dir5', 'dir6')
1030 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001031
Terry Reedy5a22b652010-12-02 07:05:56 +00001032 def test_exist_ok_existing_directory(self):
1033 path = os.path.join(support.TESTFN, 'dir1')
1034 mode = 0o777
1035 old_mask = os.umask(0o022)
1036 os.makedirs(path, mode)
1037 self.assertRaises(OSError, os.makedirs, path, mode)
1038 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001039 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001040 os.makedirs(path, mode=mode, exist_ok=True)
1041 os.umask(old_mask)
1042
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001043 def test_exist_ok_s_isgid_directory(self):
1044 path = os.path.join(support.TESTFN, 'dir1')
1045 S_ISGID = stat.S_ISGID
1046 mode = 0o777
1047 old_mask = os.umask(0o022)
1048 try:
1049 existing_testfn_mode = stat.S_IMODE(
1050 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001051 try:
1052 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001053 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001054 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001055 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1056 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1057 # The os should apply S_ISGID from the parent dir for us, but
1058 # this test need not depend on that behavior. Be explicit.
1059 os.makedirs(path, mode | S_ISGID)
1060 # http://bugs.python.org/issue14992
1061 # Should not fail when the bit is already set.
1062 os.makedirs(path, mode, exist_ok=True)
1063 # remove the bit.
1064 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001065 # May work even when the bit is not already set when demanded.
1066 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001067 finally:
1068 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001069
1070 def test_exist_ok_existing_regular_file(self):
1071 base = support.TESTFN
1072 path = os.path.join(support.TESTFN, 'dir1')
1073 f = open(path, 'w')
1074 f.write('abc')
1075 f.close()
1076 self.assertRaises(OSError, os.makedirs, path)
1077 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1078 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1079 os.remove(path)
1080
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001081 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001082 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001083 'dir4', 'dir5', 'dir6')
1084 # If the tests failed, the bottom-most directory ('../dir6')
1085 # may not have been created, so we look for the outermost directory
1086 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001087 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001088 path = os.path.dirname(path)
1089
1090 os.removedirs(path)
1091
Andrew Svetlov405faed2012-12-25 12:18:09 +02001092
R David Murrayf2ad1732014-12-25 18:36:56 -05001093@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1094class ChownFileTests(unittest.TestCase):
1095
Berker Peksag036a71b2015-07-21 09:29:48 +03001096 @classmethod
1097 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001098 os.mkdir(support.TESTFN)
1099
1100 def test_chown_uid_gid_arguments_must_be_index(self):
1101 stat = os.stat(support.TESTFN)
1102 uid = stat.st_uid
1103 gid = stat.st_gid
1104 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1105 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1106 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1107 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1108 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1109
1110 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1111 def test_chown(self):
1112 gid_1, gid_2 = groups[:2]
1113 uid = os.stat(support.TESTFN).st_uid
1114 os.chown(support.TESTFN, uid, gid_1)
1115 gid = os.stat(support.TESTFN).st_gid
1116 self.assertEqual(gid, gid_1)
1117 os.chown(support.TESTFN, uid, gid_2)
1118 gid = os.stat(support.TESTFN).st_gid
1119 self.assertEqual(gid, gid_2)
1120
1121 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1122 "test needs root privilege and more than one user")
1123 def test_chown_with_root(self):
1124 uid_1, uid_2 = all_users[:2]
1125 gid = os.stat(support.TESTFN).st_gid
1126 os.chown(support.TESTFN, uid_1, gid)
1127 uid = os.stat(support.TESTFN).st_uid
1128 self.assertEqual(uid, uid_1)
1129 os.chown(support.TESTFN, uid_2, gid)
1130 uid = os.stat(support.TESTFN).st_uid
1131 self.assertEqual(uid, uid_2)
1132
1133 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1134 "test needs non-root account and more than one user")
1135 def test_chown_without_permission(self):
1136 uid_1, uid_2 = all_users[:2]
1137 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001138 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001139 os.chown(support.TESTFN, uid_1, gid)
1140 os.chown(support.TESTFN, uid_2, gid)
1141
Berker Peksag036a71b2015-07-21 09:29:48 +03001142 @classmethod
1143 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001144 os.rmdir(support.TESTFN)
1145
1146
Andrew Svetlov405faed2012-12-25 12:18:09 +02001147class RemoveDirsTests(unittest.TestCase):
1148 def setUp(self):
1149 os.makedirs(support.TESTFN)
1150
1151 def tearDown(self):
1152 support.rmtree(support.TESTFN)
1153
1154 def test_remove_all(self):
1155 dira = os.path.join(support.TESTFN, 'dira')
1156 os.mkdir(dira)
1157 dirb = os.path.join(dira, 'dirb')
1158 os.mkdir(dirb)
1159 os.removedirs(dirb)
1160 self.assertFalse(os.path.exists(dirb))
1161 self.assertFalse(os.path.exists(dira))
1162 self.assertFalse(os.path.exists(support.TESTFN))
1163
1164 def test_remove_partial(self):
1165 dira = os.path.join(support.TESTFN, 'dira')
1166 os.mkdir(dira)
1167 dirb = os.path.join(dira, 'dirb')
1168 os.mkdir(dirb)
1169 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1170 f.write('text')
1171 os.removedirs(dirb)
1172 self.assertFalse(os.path.exists(dirb))
1173 self.assertTrue(os.path.exists(dira))
1174 self.assertTrue(os.path.exists(support.TESTFN))
1175
1176 def test_remove_nothing(self):
1177 dira = os.path.join(support.TESTFN, 'dira')
1178 os.mkdir(dira)
1179 dirb = os.path.join(dira, 'dirb')
1180 os.mkdir(dirb)
1181 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1182 f.write('text')
1183 with self.assertRaises(OSError):
1184 os.removedirs(dirb)
1185 self.assertTrue(os.path.exists(dirb))
1186 self.assertTrue(os.path.exists(dira))
1187 self.assertTrue(os.path.exists(support.TESTFN))
1188
1189
Guido van Rossume7ba4952007-06-06 23:52:48 +00001190class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001191 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001192 with open(os.devnull, 'wb') as f:
1193 f.write(b'hello')
1194 f.close()
1195 with open(os.devnull, 'rb') as f:
1196 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001197
Andrew Svetlov405faed2012-12-25 12:18:09 +02001198
Guido van Rossume7ba4952007-06-06 23:52:48 +00001199class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001200 def test_urandom_length(self):
1201 self.assertEqual(len(os.urandom(0)), 0)
1202 self.assertEqual(len(os.urandom(1)), 1)
1203 self.assertEqual(len(os.urandom(10)), 10)
1204 self.assertEqual(len(os.urandom(100)), 100)
1205 self.assertEqual(len(os.urandom(1000)), 1000)
1206
1207 def test_urandom_value(self):
1208 data1 = os.urandom(16)
1209 data2 = os.urandom(16)
1210 self.assertNotEqual(data1, data2)
1211
1212 def get_urandom_subprocess(self, count):
1213 code = '\n'.join((
1214 'import os, sys',
1215 'data = os.urandom(%s)' % count,
1216 'sys.stdout.buffer.write(data)',
1217 'sys.stdout.buffer.flush()'))
1218 out = assert_python_ok('-c', code)
1219 stdout = out[1]
1220 self.assertEqual(len(stdout), 16)
1221 return stdout
1222
1223 def test_urandom_subprocess(self):
1224 data1 = self.get_urandom_subprocess(16)
1225 data2 = self.get_urandom_subprocess(16)
1226 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001227
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001228
Victor Stinnerbae2d622015-10-01 09:47:30 +02001229# os.urandom() doesn't use a file descriptor when it is implemented with the
1230# getentropy() function, the getrandom() function or the getrandom() syscall
1231OS_URANDOM_DONT_USE_FD = (
1232 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1233 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1234 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001235
Victor Stinnerbae2d622015-10-01 09:47:30 +02001236@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1237 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001238class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001239 @unittest.skipUnless(resource, "test requires the resource module")
1240 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001241 # Check urandom() failing when it is not able to open /dev/random.
1242 # We spawn a new process to make the test more robust (if getrlimit()
1243 # failed to restore the file descriptor limit after this, the whole
1244 # test suite would crash; this actually happened on the OS X Tiger
1245 # buildbot).
1246 code = """if 1:
1247 import errno
1248 import os
1249 import resource
1250
1251 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1252 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1253 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001254 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001255 except OSError as e:
1256 assert e.errno == errno.EMFILE, e.errno
1257 else:
1258 raise AssertionError("OSError not raised")
1259 """
1260 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001261
Antoine Pitroue472aea2014-04-26 14:33:03 +02001262 def test_urandom_fd_closed(self):
1263 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1264 # closed.
1265 code = """if 1:
1266 import os
1267 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001268 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001269 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001270 with test.support.SuppressCrashReport():
1271 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001272 sys.stdout.buffer.write(os.urandom(4))
1273 """
1274 rc, out, err = assert_python_ok('-Sc', code)
1275
1276 def test_urandom_fd_reopened(self):
1277 # Issue #21207: urandom() should detect its fd to /dev/urandom
1278 # changed to something else, and reopen it.
1279 with open(support.TESTFN, 'wb') as f:
1280 f.write(b"x" * 256)
1281 self.addCleanup(os.unlink, support.TESTFN)
1282 code = """if 1:
1283 import os
1284 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001285 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001286 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001287 with test.support.SuppressCrashReport():
1288 for fd in range(3, 256):
1289 try:
1290 os.close(fd)
1291 except OSError:
1292 pass
1293 else:
1294 # Found the urandom fd (XXX hopefully)
1295 break
1296 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001297 with open({TESTFN!r}, 'rb') as f:
1298 os.dup2(f.fileno(), fd)
1299 sys.stdout.buffer.write(os.urandom(4))
1300 sys.stdout.buffer.write(os.urandom(4))
1301 """.format(TESTFN=support.TESTFN)
1302 rc, out, err = assert_python_ok('-Sc', code)
1303 self.assertEqual(len(out), 8)
1304 self.assertNotEqual(out[0:4], out[4:8])
1305 rc, out2, err2 = assert_python_ok('-Sc', code)
1306 self.assertEqual(len(out2), 8)
1307 self.assertNotEqual(out2, out)
1308
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001309
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001310@contextlib.contextmanager
1311def _execvpe_mockup(defpath=None):
1312 """
1313 Stubs out execv and execve functions when used as context manager.
1314 Records exec calls. The mock execv and execve functions always raise an
1315 exception as they would normally never return.
1316 """
1317 # A list of tuples containing (function name, first arg, args)
1318 # of calls to execv or execve that have been made.
1319 calls = []
1320
1321 def mock_execv(name, *args):
1322 calls.append(('execv', name, args))
1323 raise RuntimeError("execv called")
1324
1325 def mock_execve(name, *args):
1326 calls.append(('execve', name, args))
1327 raise OSError(errno.ENOTDIR, "execve called")
1328
1329 try:
1330 orig_execv = os.execv
1331 orig_execve = os.execve
1332 orig_defpath = os.defpath
1333 os.execv = mock_execv
1334 os.execve = mock_execve
1335 if defpath is not None:
1336 os.defpath = defpath
1337 yield calls
1338 finally:
1339 os.execv = orig_execv
1340 os.execve = orig_execve
1341 os.defpath = orig_defpath
1342
Guido van Rossume7ba4952007-06-06 23:52:48 +00001343class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001344 @unittest.skipIf(USING_LINUXTHREADS,
1345 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001346 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001347 self.assertRaises(OSError, os.execvpe, 'no such app-',
1348 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001349
Thomas Heller6790d602007-08-30 17:15:14 +00001350 def test_execvpe_with_bad_arglist(self):
1351 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1352
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001353 @unittest.skipUnless(hasattr(os, '_execvpe'),
1354 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001355 def _test_internal_execvpe(self, test_type):
1356 program_path = os.sep + 'absolutepath'
1357 if test_type is bytes:
1358 program = b'executable'
1359 fullpath = os.path.join(os.fsencode(program_path), program)
1360 native_fullpath = fullpath
1361 arguments = [b'progname', 'arg1', 'arg2']
1362 else:
1363 program = 'executable'
1364 arguments = ['progname', 'arg1', 'arg2']
1365 fullpath = os.path.join(program_path, program)
1366 if os.name != "nt":
1367 native_fullpath = os.fsencode(fullpath)
1368 else:
1369 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001370 env = {'spam': 'beans'}
1371
Victor Stinnerb745a742010-05-18 17:17:23 +00001372 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001373 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001374 self.assertRaises(RuntimeError,
1375 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001376 self.assertEqual(len(calls), 1)
1377 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1378
Victor Stinnerb745a742010-05-18 17:17:23 +00001379 # test os._execvpe() with a relative path:
1380 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001381 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001382 self.assertRaises(OSError,
1383 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001384 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001385 self.assertSequenceEqual(calls[0],
1386 ('execve', native_fullpath, (arguments, env)))
1387
1388 # test os._execvpe() with a relative path:
1389 # os.get_exec_path() reads the 'PATH' variable
1390 with _execvpe_mockup() as calls:
1391 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001392 if test_type is bytes:
1393 env_path[b'PATH'] = program_path
1394 else:
1395 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001396 self.assertRaises(OSError,
1397 os._execvpe, program, arguments, env=env_path)
1398 self.assertEqual(len(calls), 1)
1399 self.assertSequenceEqual(calls[0],
1400 ('execve', native_fullpath, (arguments, env_path)))
1401
1402 def test_internal_execvpe_str(self):
1403 self._test_internal_execvpe(str)
1404 if os.name != "nt":
1405 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001406
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001407
Serhiy Storchaka43767632013-11-03 21:31:38 +02001408@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001409class Win32ErrorTests(unittest.TestCase):
1410 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001411 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001412
1413 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001414 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001415
1416 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001417 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001418
1419 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001420 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001421 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001422 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001423 finally:
1424 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001425 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001426
1427 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001428 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001429
Thomas Wouters477c8d52006-05-27 19:21:47 +00001430 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001431 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001432
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001433class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001434 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001435 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1436 #singles.append("close")
1437 #We omit close because it doesn'r raise an exception on some platforms
1438 def get_single(f):
1439 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001440 if hasattr(os, f):
1441 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001442 return helper
1443 for f in singles:
1444 locals()["test_"+f] = get_single(f)
1445
Benjamin Peterson7522c742009-01-19 21:00:09 +00001446 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001447 try:
1448 f(support.make_bad_fd(), *args)
1449 except OSError as e:
1450 self.assertEqual(e.errno, errno.EBADF)
1451 else:
1452 self.fail("%r didn't raise a OSError with a bad file descriptor"
1453 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001454
Serhiy Storchaka43767632013-11-03 21:31:38 +02001455 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001456 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001457 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001458
Serhiy Storchaka43767632013-11-03 21:31:38 +02001459 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001460 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001461 fd = support.make_bad_fd()
1462 # Make sure none of the descriptors we are about to close are
1463 # currently valid (issue 6542).
1464 for i in range(10):
1465 try: os.fstat(fd+i)
1466 except OSError:
1467 pass
1468 else:
1469 break
1470 if i < 2:
1471 raise unittest.SkipTest(
1472 "Unable to acquire a range of invalid file descriptors")
1473 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001474
Serhiy Storchaka43767632013-11-03 21:31:38 +02001475 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001476 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001477 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001478
Serhiy Storchaka43767632013-11-03 21:31:38 +02001479 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001480 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001481 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001482
Serhiy Storchaka43767632013-11-03 21:31:38 +02001483 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001484 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001485 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001486
Serhiy Storchaka43767632013-11-03 21:31:38 +02001487 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001488 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001489 self.check(os.pathconf, "PC_NAME_MAX")
1490 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001491
Serhiy Storchaka43767632013-11-03 21:31:38 +02001492 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001493 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001494 self.check(os.truncate, 0)
1495 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001496
Serhiy Storchaka43767632013-11-03 21:31:38 +02001497 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001498 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001499 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001500
Serhiy Storchaka43767632013-11-03 21:31:38 +02001501 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001502 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001503 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001504
Victor Stinner57ddf782014-01-08 15:21:28 +01001505 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1506 def test_readv(self):
1507 buf = bytearray(10)
1508 self.check(os.readv, [buf])
1509
Serhiy Storchaka43767632013-11-03 21:31:38 +02001510 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001511 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001512 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001513
Serhiy Storchaka43767632013-11-03 21:31:38 +02001514 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001515 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001516 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001517
Victor Stinner57ddf782014-01-08 15:21:28 +01001518 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1519 def test_writev(self):
1520 self.check(os.writev, [b'abc'])
1521
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001522 def test_inheritable(self):
1523 self.check(os.get_inheritable)
1524 self.check(os.set_inheritable, True)
1525
1526 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1527 'needs os.get_blocking() and os.set_blocking()')
1528 def test_blocking(self):
1529 self.check(os.get_blocking)
1530 self.check(os.set_blocking, True)
1531
Brian Curtin1b9df392010-11-24 20:24:31 +00001532
1533class LinkTests(unittest.TestCase):
1534 def setUp(self):
1535 self.file1 = support.TESTFN
1536 self.file2 = os.path.join(support.TESTFN + "2")
1537
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001538 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001539 for file in (self.file1, self.file2):
1540 if os.path.exists(file):
1541 os.unlink(file)
1542
Brian Curtin1b9df392010-11-24 20:24:31 +00001543 def _test_link(self, file1, file2):
1544 with open(file1, "w") as f1:
1545 f1.write("test")
1546
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001547 with warnings.catch_warnings():
1548 warnings.simplefilter("ignore", DeprecationWarning)
1549 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001550 with open(file1, "r") as f1, open(file2, "r") as f2:
1551 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1552
1553 def test_link(self):
1554 self._test_link(self.file1, self.file2)
1555
1556 def test_link_bytes(self):
1557 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1558 bytes(self.file2, sys.getfilesystemencoding()))
1559
Brian Curtinf498b752010-11-30 15:54:04 +00001560 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001561 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001562 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001563 except UnicodeError:
1564 raise unittest.SkipTest("Unable to encode for this platform.")
1565
Brian Curtinf498b752010-11-30 15:54:04 +00001566 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001567 self.file2 = self.file1 + "2"
1568 self._test_link(self.file1, self.file2)
1569
Serhiy Storchaka43767632013-11-03 21:31:38 +02001570@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1571class PosixUidGidTests(unittest.TestCase):
1572 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1573 def test_setuid(self):
1574 if os.getuid() != 0:
1575 self.assertRaises(OSError, os.setuid, 0)
1576 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001577
Serhiy Storchaka43767632013-11-03 21:31:38 +02001578 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1579 def test_setgid(self):
1580 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1581 self.assertRaises(OSError, os.setgid, 0)
1582 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001583
Serhiy Storchaka43767632013-11-03 21:31:38 +02001584 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1585 def test_seteuid(self):
1586 if os.getuid() != 0:
1587 self.assertRaises(OSError, os.seteuid, 0)
1588 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001589
Serhiy Storchaka43767632013-11-03 21:31:38 +02001590 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1591 def test_setegid(self):
1592 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1593 self.assertRaises(OSError, os.setegid, 0)
1594 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001595
Serhiy Storchaka43767632013-11-03 21:31:38 +02001596 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1597 def test_setreuid(self):
1598 if os.getuid() != 0:
1599 self.assertRaises(OSError, os.setreuid, 0, 0)
1600 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1601 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001602
Serhiy Storchaka43767632013-11-03 21:31:38 +02001603 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1604 def test_setreuid_neg1(self):
1605 # Needs to accept -1. We run this in a subprocess to avoid
1606 # altering the test runner's process state (issue8045).
1607 subprocess.check_call([
1608 sys.executable, '-c',
1609 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001610
Serhiy Storchaka43767632013-11-03 21:31:38 +02001611 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1612 def test_setregid(self):
1613 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1614 self.assertRaises(OSError, os.setregid, 0, 0)
1615 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1616 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001617
Serhiy Storchaka43767632013-11-03 21:31:38 +02001618 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1619 def test_setregid_neg1(self):
1620 # Needs to accept -1. We run this in a subprocess to avoid
1621 # altering the test runner's process state (issue8045).
1622 subprocess.check_call([
1623 sys.executable, '-c',
1624 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001625
Serhiy Storchaka43767632013-11-03 21:31:38 +02001626@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1627class Pep383Tests(unittest.TestCase):
1628 def setUp(self):
1629 if support.TESTFN_UNENCODABLE:
1630 self.dir = support.TESTFN_UNENCODABLE
1631 elif support.TESTFN_NONASCII:
1632 self.dir = support.TESTFN_NONASCII
1633 else:
1634 self.dir = support.TESTFN
1635 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001636
Serhiy Storchaka43767632013-11-03 21:31:38 +02001637 bytesfn = []
1638 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001639 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001640 fn = os.fsencode(fn)
1641 except UnicodeEncodeError:
1642 return
1643 bytesfn.append(fn)
1644 add_filename(support.TESTFN_UNICODE)
1645 if support.TESTFN_UNENCODABLE:
1646 add_filename(support.TESTFN_UNENCODABLE)
1647 if support.TESTFN_NONASCII:
1648 add_filename(support.TESTFN_NONASCII)
1649 if not bytesfn:
1650 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001651
Serhiy Storchaka43767632013-11-03 21:31:38 +02001652 self.unicodefn = set()
1653 os.mkdir(self.dir)
1654 try:
1655 for fn in bytesfn:
1656 support.create_empty_file(os.path.join(self.bdir, fn))
1657 fn = os.fsdecode(fn)
1658 if fn in self.unicodefn:
1659 raise ValueError("duplicate filename")
1660 self.unicodefn.add(fn)
1661 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001662 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001663 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001664
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 def tearDown(self):
1666 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001667
Serhiy Storchaka43767632013-11-03 21:31:38 +02001668 def test_listdir(self):
1669 expected = self.unicodefn
1670 found = set(os.listdir(self.dir))
1671 self.assertEqual(found, expected)
1672 # test listdir without arguments
1673 current_directory = os.getcwd()
1674 try:
1675 os.chdir(os.sep)
1676 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1677 finally:
1678 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001679
Serhiy Storchaka43767632013-11-03 21:31:38 +02001680 def test_open(self):
1681 for fn in self.unicodefn:
1682 f = open(os.path.join(self.dir, fn), 'rb')
1683 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001684
Serhiy Storchaka43767632013-11-03 21:31:38 +02001685 @unittest.skipUnless(hasattr(os, 'statvfs'),
1686 "need os.statvfs()")
1687 def test_statvfs(self):
1688 # issue #9645
1689 for fn in self.unicodefn:
1690 # should not fail with file not found error
1691 fullname = os.path.join(self.dir, fn)
1692 os.statvfs(fullname)
1693
1694 def test_stat(self):
1695 for fn in self.unicodefn:
1696 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001697
Brian Curtineb24d742010-04-12 17:16:38 +00001698@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1699class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001700 def _kill(self, sig):
1701 # Start sys.executable as a subprocess and communicate from the
1702 # subprocess to the parent that the interpreter is ready. When it
1703 # becomes ready, send *sig* via os.kill to the subprocess and check
1704 # that the return code is equal to *sig*.
1705 import ctypes
1706 from ctypes import wintypes
1707 import msvcrt
1708
1709 # Since we can't access the contents of the process' stdout until the
1710 # process has exited, use PeekNamedPipe to see what's inside stdout
1711 # without waiting. This is done so we can tell that the interpreter
1712 # is started and running at a point where it could handle a signal.
1713 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1714 PeekNamedPipe.restype = wintypes.BOOL
1715 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1716 ctypes.POINTER(ctypes.c_char), # stdout buf
1717 wintypes.DWORD, # Buffer size
1718 ctypes.POINTER(wintypes.DWORD), # bytes read
1719 ctypes.POINTER(wintypes.DWORD), # bytes avail
1720 ctypes.POINTER(wintypes.DWORD)) # bytes left
1721 msg = "running"
1722 proc = subprocess.Popen([sys.executable, "-c",
1723 "import sys;"
1724 "sys.stdout.write('{}');"
1725 "sys.stdout.flush();"
1726 "input()".format(msg)],
1727 stdout=subprocess.PIPE,
1728 stderr=subprocess.PIPE,
1729 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001730 self.addCleanup(proc.stdout.close)
1731 self.addCleanup(proc.stderr.close)
1732 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001733
1734 count, max = 0, 100
1735 while count < max and proc.poll() is None:
1736 # Create a string buffer to store the result of stdout from the pipe
1737 buf = ctypes.create_string_buffer(len(msg))
1738 # Obtain the text currently in proc.stdout
1739 # Bytes read/avail/left are left as NULL and unused
1740 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1741 buf, ctypes.sizeof(buf), None, None, None)
1742 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1743 if buf.value:
1744 self.assertEqual(msg, buf.value.decode())
1745 break
1746 time.sleep(0.1)
1747 count += 1
1748 else:
1749 self.fail("Did not receive communication from the subprocess")
1750
Brian Curtineb24d742010-04-12 17:16:38 +00001751 os.kill(proc.pid, sig)
1752 self.assertEqual(proc.wait(), sig)
1753
1754 def test_kill_sigterm(self):
1755 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001756 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001757
1758 def test_kill_int(self):
1759 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001760 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001761
1762 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001763 tagname = "test_os_%s" % uuid.uuid1()
1764 m = mmap.mmap(-1, 1, tagname)
1765 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001766 # Run a script which has console control handling enabled.
1767 proc = subprocess.Popen([sys.executable,
1768 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001769 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001770 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1771 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001772 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001773 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001774 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001775 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001776 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001777 count += 1
1778 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001779 # Forcefully kill the process if we weren't able to signal it.
1780 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001781 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001782 os.kill(proc.pid, event)
1783 # proc.send_signal(event) could also be done here.
1784 # Allow time for the signal to be passed and the process to exit.
1785 time.sleep(0.5)
1786 if not proc.poll():
1787 # Forcefully kill the process if we weren't able to signal it.
1788 os.kill(proc.pid, signal.SIGINT)
1789 self.fail("subprocess did not stop on {}".format(name))
1790
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001791 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001792 def test_CTRL_C_EVENT(self):
1793 from ctypes import wintypes
1794 import ctypes
1795
1796 # Make a NULL value by creating a pointer with no argument.
1797 NULL = ctypes.POINTER(ctypes.c_int)()
1798 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1799 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1800 wintypes.BOOL)
1801 SetConsoleCtrlHandler.restype = wintypes.BOOL
1802
1803 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001804 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001805 # by subprocesses.
1806 SetConsoleCtrlHandler(NULL, 0)
1807
1808 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1809
1810 def test_CTRL_BREAK_EVENT(self):
1811 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1812
1813
Brian Curtind40e6f72010-07-08 21:39:08 +00001814@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001815class Win32ListdirTests(unittest.TestCase):
1816 """Test listdir on Windows."""
1817
1818 def setUp(self):
1819 self.created_paths = []
1820 for i in range(2):
1821 dir_name = 'SUB%d' % i
1822 dir_path = os.path.join(support.TESTFN, dir_name)
1823 file_name = 'FILE%d' % i
1824 file_path = os.path.join(support.TESTFN, file_name)
1825 os.makedirs(dir_path)
1826 with open(file_path, 'w') as f:
1827 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1828 self.created_paths.extend([dir_name, file_name])
1829 self.created_paths.sort()
1830
1831 def tearDown(self):
1832 shutil.rmtree(support.TESTFN)
1833
1834 def test_listdir_no_extended_path(self):
1835 """Test when the path is not an "extended" path."""
1836 # unicode
1837 self.assertEqual(
1838 sorted(os.listdir(support.TESTFN)),
1839 self.created_paths)
1840 # bytes
1841 self.assertEqual(
1842 sorted(os.listdir(os.fsencode(support.TESTFN))),
1843 [os.fsencode(path) for path in self.created_paths])
1844
1845 def test_listdir_extended_path(self):
1846 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001847 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001848 # unicode
1849 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1850 self.assertEqual(
1851 sorted(os.listdir(path)),
1852 self.created_paths)
1853 # bytes
1854 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1855 self.assertEqual(
1856 sorted(os.listdir(path)),
1857 [os.fsencode(path) for path in self.created_paths])
1858
1859
1860@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001861@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001862class Win32SymlinkTests(unittest.TestCase):
1863 filelink = 'filelinktest'
1864 filelink_target = os.path.abspath(__file__)
1865 dirlink = 'dirlinktest'
1866 dirlink_target = os.path.dirname(filelink_target)
1867 missing_link = 'missing link'
1868
1869 def setUp(self):
1870 assert os.path.exists(self.dirlink_target)
1871 assert os.path.exists(self.filelink_target)
1872 assert not os.path.exists(self.dirlink)
1873 assert not os.path.exists(self.filelink)
1874 assert not os.path.exists(self.missing_link)
1875
1876 def tearDown(self):
1877 if os.path.exists(self.filelink):
1878 os.remove(self.filelink)
1879 if os.path.exists(self.dirlink):
1880 os.rmdir(self.dirlink)
1881 if os.path.lexists(self.missing_link):
1882 os.remove(self.missing_link)
1883
1884 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001885 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001886 self.assertTrue(os.path.exists(self.dirlink))
1887 self.assertTrue(os.path.isdir(self.dirlink))
1888 self.assertTrue(os.path.islink(self.dirlink))
1889 self.check_stat(self.dirlink, self.dirlink_target)
1890
1891 def test_file_link(self):
1892 os.symlink(self.filelink_target, self.filelink)
1893 self.assertTrue(os.path.exists(self.filelink))
1894 self.assertTrue(os.path.isfile(self.filelink))
1895 self.assertTrue(os.path.islink(self.filelink))
1896 self.check_stat(self.filelink, self.filelink_target)
1897
1898 def _create_missing_dir_link(self):
1899 'Create a "directory" link to a non-existent target'
1900 linkname = self.missing_link
1901 if os.path.lexists(linkname):
1902 os.remove(linkname)
1903 target = r'c:\\target does not exist.29r3c740'
1904 assert not os.path.exists(target)
1905 target_is_dir = True
1906 os.symlink(target, linkname, target_is_dir)
1907
1908 def test_remove_directory_link_to_missing_target(self):
1909 self._create_missing_dir_link()
1910 # For compatibility with Unix, os.remove will check the
1911 # directory status and call RemoveDirectory if the symlink
1912 # was created with target_is_dir==True.
1913 os.remove(self.missing_link)
1914
1915 @unittest.skip("currently fails; consider for improvement")
1916 def test_isdir_on_directory_link_to_missing_target(self):
1917 self._create_missing_dir_link()
1918 # consider having isdir return true for directory links
1919 self.assertTrue(os.path.isdir(self.missing_link))
1920
1921 @unittest.skip("currently fails; consider for improvement")
1922 def test_rmdir_on_directory_link_to_missing_target(self):
1923 self._create_missing_dir_link()
1924 # consider allowing rmdir to remove directory links
1925 os.rmdir(self.missing_link)
1926
1927 def check_stat(self, link, target):
1928 self.assertEqual(os.stat(link), os.stat(target))
1929 self.assertNotEqual(os.lstat(link), os.stat(link))
1930
Brian Curtind25aef52011-06-13 15:16:04 -05001931 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001932 with warnings.catch_warnings():
1933 warnings.simplefilter("ignore", DeprecationWarning)
1934 self.assertEqual(os.stat(bytes_link), os.stat(target))
1935 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001936
1937 def test_12084(self):
1938 level1 = os.path.abspath(support.TESTFN)
1939 level2 = os.path.join(level1, "level2")
1940 level3 = os.path.join(level2, "level3")
1941 try:
1942 os.mkdir(level1)
1943 os.mkdir(level2)
1944 os.mkdir(level3)
1945
1946 file1 = os.path.abspath(os.path.join(level1, "file1"))
1947
1948 with open(file1, "w") as f:
1949 f.write("file1")
1950
1951 orig_dir = os.getcwd()
1952 try:
1953 os.chdir(level2)
1954 link = os.path.join(level2, "link")
1955 os.symlink(os.path.relpath(file1), "link")
1956 self.assertIn("link", os.listdir(os.getcwd()))
1957
1958 # Check os.stat calls from the same dir as the link
1959 self.assertEqual(os.stat(file1), os.stat("link"))
1960
1961 # Check os.stat calls from a dir below the link
1962 os.chdir(level1)
1963 self.assertEqual(os.stat(file1),
1964 os.stat(os.path.relpath(link)))
1965
1966 # Check os.stat calls from a dir above the link
1967 os.chdir(level3)
1968 self.assertEqual(os.stat(file1),
1969 os.stat(os.path.relpath(link)))
1970 finally:
1971 os.chdir(orig_dir)
1972 except OSError as err:
1973 self.fail(err)
1974 finally:
1975 os.remove(file1)
1976 shutil.rmtree(level1)
1977
Brian Curtind40e6f72010-07-08 21:39:08 +00001978
Tim Golden0321cf22014-05-05 19:46:17 +01001979@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1980class Win32JunctionTests(unittest.TestCase):
1981 junction = 'junctiontest'
1982 junction_target = os.path.dirname(os.path.abspath(__file__))
1983
1984 def setUp(self):
1985 assert os.path.exists(self.junction_target)
1986 assert not os.path.exists(self.junction)
1987
1988 def tearDown(self):
1989 if os.path.exists(self.junction):
1990 # os.rmdir delegates to Windows' RemoveDirectoryW,
1991 # which removes junction points safely.
1992 os.rmdir(self.junction)
1993
1994 def test_create_junction(self):
1995 _winapi.CreateJunction(self.junction_target, self.junction)
1996 self.assertTrue(os.path.exists(self.junction))
1997 self.assertTrue(os.path.isdir(self.junction))
1998
1999 # Junctions are not recognized as links.
2000 self.assertFalse(os.path.islink(self.junction))
2001
2002 def test_unlink_removes_junction(self):
2003 _winapi.CreateJunction(self.junction_target, self.junction)
2004 self.assertTrue(os.path.exists(self.junction))
2005
2006 os.unlink(self.junction)
2007 self.assertFalse(os.path.exists(self.junction))
2008
2009
Jason R. Coombs3a092862013-05-27 23:21:28 -04002010@support.skip_unless_symlink
2011class NonLocalSymlinkTests(unittest.TestCase):
2012
2013 def setUp(self):
2014 """
2015 Create this structure:
2016
2017 base
2018 \___ some_dir
2019 """
2020 os.makedirs('base/some_dir')
2021
2022 def tearDown(self):
2023 shutil.rmtree('base')
2024
2025 def test_directory_link_nonlocal(self):
2026 """
2027 The symlink target should resolve relative to the link, not relative
2028 to the current directory.
2029
2030 Then, link base/some_link -> base/some_dir and ensure that some_link
2031 is resolved as a directory.
2032
2033 In issue13772, it was discovered that directory detection failed if
2034 the symlink target was not specified relative to the current
2035 directory, which was a defect in the implementation.
2036 """
2037 src = os.path.join('base', 'some_link')
2038 os.symlink('some_dir', src)
2039 assert os.path.isdir(src)
2040
2041
Victor Stinnere8d51452010-08-19 01:05:19 +00002042class FSEncodingTests(unittest.TestCase):
2043 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002044 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2045 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002046
Victor Stinnere8d51452010-08-19 01:05:19 +00002047 def test_identity(self):
2048 # assert fsdecode(fsencode(x)) == x
2049 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2050 try:
2051 bytesfn = os.fsencode(fn)
2052 except UnicodeEncodeError:
2053 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002054 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002055
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002056
Brett Cannonefb00c02012-02-29 18:31:31 -05002057
2058class DeviceEncodingTests(unittest.TestCase):
2059
2060 def test_bad_fd(self):
2061 # Return None when an fd doesn't actually exist.
2062 self.assertIsNone(os.device_encoding(123456))
2063
Philip Jenveye308b7c2012-02-29 16:16:15 -08002064 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2065 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002066 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002067 def test_device_encoding(self):
2068 encoding = os.device_encoding(0)
2069 self.assertIsNotNone(encoding)
2070 self.assertTrue(codecs.lookup(encoding))
2071
2072
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002073class PidTests(unittest.TestCase):
2074 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2075 def test_getppid(self):
2076 p = subprocess.Popen([sys.executable, '-c',
2077 'import os; print(os.getppid())'],
2078 stdout=subprocess.PIPE)
2079 stdout, _ = p.communicate()
2080 # We are the parent of our subprocess
2081 self.assertEqual(int(stdout), os.getpid())
2082
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002083 def test_waitpid(self):
2084 args = [sys.executable, '-c', 'pass']
2085 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2086 status = os.waitpid(pid, 0)
2087 self.assertEqual(status, (pid, 0))
2088
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002089
Brian Curtin0151b8e2010-09-24 13:43:43 +00002090# The introduction of this TestCase caused at least two different errors on
2091# *nix buildbots. Temporarily skip this to let the buildbots move along.
2092@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002093@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2094class LoginTests(unittest.TestCase):
2095 def test_getlogin(self):
2096 user_name = os.getlogin()
2097 self.assertNotEqual(len(user_name), 0)
2098
2099
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002100@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2101 "needs os.getpriority and os.setpriority")
2102class ProgramPriorityTests(unittest.TestCase):
2103 """Tests for os.getpriority() and os.setpriority()."""
2104
2105 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002106
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002107 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2108 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2109 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002110 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2111 if base >= 19 and new_prio <= 19:
2112 raise unittest.SkipTest(
2113 "unable to reliably test setpriority at current nice level of %s" % base)
2114 else:
2115 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002116 finally:
2117 try:
2118 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2119 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002120 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002121 raise
2122
2123
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002124if threading is not None:
2125 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002126
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002127 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002128
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002129 def __init__(self, conn):
2130 asynchat.async_chat.__init__(self, conn)
2131 self.in_buffer = []
2132 self.closed = False
2133 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002134
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002135 def handle_read(self):
2136 data = self.recv(4096)
2137 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002138
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002139 def get_data(self):
2140 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002141
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002142 def handle_close(self):
2143 self.close()
2144 self.closed = True
2145
2146 def handle_error(self):
2147 raise
2148
2149 def __init__(self, address):
2150 threading.Thread.__init__(self)
2151 asyncore.dispatcher.__init__(self)
2152 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2153 self.bind(address)
2154 self.listen(5)
2155 self.host, self.port = self.socket.getsockname()[:2]
2156 self.handler_instance = None
2157 self._active = False
2158 self._active_lock = threading.Lock()
2159
2160 # --- public API
2161
2162 @property
2163 def running(self):
2164 return self._active
2165
2166 def start(self):
2167 assert not self.running
2168 self.__flag = threading.Event()
2169 threading.Thread.start(self)
2170 self.__flag.wait()
2171
2172 def stop(self):
2173 assert self.running
2174 self._active = False
2175 self.join()
2176
2177 def wait(self):
2178 # wait for handler connection to be closed, then stop the server
2179 while not getattr(self.handler_instance, "closed", False):
2180 time.sleep(0.001)
2181 self.stop()
2182
2183 # --- internals
2184
2185 def run(self):
2186 self._active = True
2187 self.__flag.set()
2188 while self._active and asyncore.socket_map:
2189 self._active_lock.acquire()
2190 asyncore.loop(timeout=0.001, count=1)
2191 self._active_lock.release()
2192 asyncore.close_all()
2193
2194 def handle_accept(self):
2195 conn, addr = self.accept()
2196 self.handler_instance = self.Handler(conn)
2197
2198 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002199 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002200 handle_read = handle_connect
2201
2202 def writable(self):
2203 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002204
2205 def handle_error(self):
2206 raise
2207
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002208
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002209@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002210@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2211class TestSendfile(unittest.TestCase):
2212
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002213 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002214 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002215 not sys.platform.startswith("solaris") and \
2216 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002217 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2218 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002219
2220 @classmethod
2221 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002222 cls.key = support.threading_setup()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002223 with open(support.TESTFN, "wb") as f:
2224 f.write(cls.DATA)
2225
2226 @classmethod
2227 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002228 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002229 support.unlink(support.TESTFN)
2230
2231 def setUp(self):
2232 self.server = SendfileTestServer((support.HOST, 0))
2233 self.server.start()
2234 self.client = socket.socket()
2235 self.client.connect((self.server.host, self.server.port))
2236 self.client.settimeout(1)
2237 # synchronize by waiting for "220 ready" response
2238 self.client.recv(1024)
2239 self.sockno = self.client.fileno()
2240 self.file = open(support.TESTFN, 'rb')
2241 self.fileno = self.file.fileno()
2242
2243 def tearDown(self):
2244 self.file.close()
2245 self.client.close()
2246 if self.server.running:
2247 self.server.stop()
2248
2249 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2250 """A higher level wrapper representing how an application is
2251 supposed to use sendfile().
2252 """
2253 while 1:
2254 try:
2255 if self.SUPPORT_HEADERS_TRAILERS:
2256 return os.sendfile(sock, file, offset, nbytes, headers,
2257 trailers)
2258 else:
2259 return os.sendfile(sock, file, offset, nbytes)
2260 except OSError as err:
2261 if err.errno == errno.ECONNRESET:
2262 # disconnected
2263 raise
2264 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2265 # we have to retry send data
2266 continue
2267 else:
2268 raise
2269
2270 def test_send_whole_file(self):
2271 # normal send
2272 total_sent = 0
2273 offset = 0
2274 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002275 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002276 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2277 if sent == 0:
2278 break
2279 offset += sent
2280 total_sent += sent
2281 self.assertTrue(sent <= nbytes)
2282 self.assertEqual(offset, total_sent)
2283
2284 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002285 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002286 self.client.close()
2287 self.server.wait()
2288 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002289 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002290 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002291
2292 def test_send_at_certain_offset(self):
2293 # start sending a file at a certain offset
2294 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002295 offset = len(self.DATA) // 2
2296 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002297 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002298 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002299 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2300 if sent == 0:
2301 break
2302 offset += sent
2303 total_sent += sent
2304 self.assertTrue(sent <= nbytes)
2305
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002306 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002307 self.client.close()
2308 self.server.wait()
2309 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002310 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002311 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002312 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002313 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002314
2315 def test_offset_overflow(self):
2316 # specify an offset > file size
2317 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002318 try:
2319 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2320 except OSError as e:
2321 # Solaris can raise EINVAL if offset >= file length, ignore.
2322 if e.errno != errno.EINVAL:
2323 raise
2324 else:
2325 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002326 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002327 self.client.close()
2328 self.server.wait()
2329 data = self.server.handler_instance.get_data()
2330 self.assertEqual(data, b'')
2331
2332 def test_invalid_offset(self):
2333 with self.assertRaises(OSError) as cm:
2334 os.sendfile(self.sockno, self.fileno, -1, 4096)
2335 self.assertEqual(cm.exception.errno, errno.EINVAL)
2336
Martin Panterbf19d162015-09-09 01:01:13 +00002337 def test_keywords(self):
2338 # Keyword arguments should be supported
2339 os.sendfile(out=self.sockno, offset=0, count=4096,
2340 **{'in': self.fileno})
2341 if self.SUPPORT_HEADERS_TRAILERS:
2342 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002343 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002344
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002345 # --- headers / trailers tests
2346
Serhiy Storchaka43767632013-11-03 21:31:38 +02002347 @requires_headers_trailers
2348 def test_headers(self):
2349 total_sent = 0
2350 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2351 headers=[b"x" * 512])
2352 total_sent += sent
2353 offset = 4096
2354 nbytes = 4096
2355 while 1:
2356 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2357 offset, nbytes)
2358 if sent == 0:
2359 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002360 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002361 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002362
Serhiy Storchaka43767632013-11-03 21:31:38 +02002363 expected_data = b"x" * 512 + self.DATA
2364 self.assertEqual(total_sent, len(expected_data))
2365 self.client.close()
2366 self.server.wait()
2367 data = self.server.handler_instance.get_data()
2368 self.assertEqual(hash(data), hash(expected_data))
2369
2370 @requires_headers_trailers
2371 def test_trailers(self):
2372 TESTFN2 = support.TESTFN + "2"
2373 file_data = b"abcdef"
2374 with open(TESTFN2, 'wb') as f:
2375 f.write(file_data)
2376 with open(TESTFN2, 'rb')as f:
2377 self.addCleanup(os.remove, TESTFN2)
2378 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2379 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002380 self.client.close()
2381 self.server.wait()
2382 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002383 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002384
Serhiy Storchaka43767632013-11-03 21:31:38 +02002385 @requires_headers_trailers
2386 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2387 'test needs os.SF_NODISKIO')
2388 def test_flags(self):
2389 try:
2390 os.sendfile(self.sockno, self.fileno, 0, 4096,
2391 flags=os.SF_NODISKIO)
2392 except OSError as err:
2393 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2394 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002395
2396
Larry Hastings9cf065c2012-06-22 16:30:09 -07002397def supports_extended_attributes():
2398 if not hasattr(os, "setxattr"):
2399 return False
2400 try:
2401 with open(support.TESTFN, "wb") as fp:
2402 try:
2403 os.setxattr(fp.fileno(), b"user.test", b"")
2404 except OSError:
2405 return False
2406 finally:
2407 support.unlink(support.TESTFN)
2408 # Kernels < 2.6.39 don't respect setxattr flags.
2409 kernel_version = platform.release()
2410 m = re.match("2.6.(\d{1,2})", kernel_version)
2411 return m is None or int(m.group(1)) >= 39
2412
2413
2414@unittest.skipUnless(supports_extended_attributes(),
2415 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002416class ExtendedAttributeTests(unittest.TestCase):
2417
2418 def tearDown(self):
2419 support.unlink(support.TESTFN)
2420
Larry Hastings9cf065c2012-06-22 16:30:09 -07002421 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002422 fn = support.TESTFN
2423 open(fn, "wb").close()
2424 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002425 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002426 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002427 init_xattr = listxattr(fn)
2428 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002429 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002430 xattr = set(init_xattr)
2431 xattr.add("user.test")
2432 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002433 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2434 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2435 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002436 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002437 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002438 self.assertEqual(cm.exception.errno, errno.EEXIST)
2439 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002440 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002441 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002442 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002443 xattr.add("user.test2")
2444 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002445 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002446 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002447 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002448 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002449 xattr.remove("user.test")
2450 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002451 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2452 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2453 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2454 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002455 many = sorted("user.test{}".format(i) for i in range(100))
2456 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002457 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002458 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002459
Larry Hastings9cf065c2012-06-22 16:30:09 -07002460 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002461 def make_bytes(s):
2462 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002463 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002464 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002465 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002466
2467 def test_simple(self):
2468 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2469 os.listxattr)
2470
2471 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002472 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2473 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002474
2475 def test_fds(self):
2476 def getxattr(path, *args):
2477 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002478 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002479 def setxattr(path, *args):
2480 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002481 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002482 def removexattr(path, *args):
2483 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002484 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002485 def listxattr(path, *args):
2486 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002487 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002488 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2489
2490
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002491@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2492class Win32DeprecatedBytesAPI(unittest.TestCase):
2493 def test_deprecated(self):
2494 import nt
2495 filename = os.fsencode(support.TESTFN)
2496 with warnings.catch_warnings():
2497 warnings.simplefilter("error", DeprecationWarning)
2498 for func, *args in (
2499 (nt._getfullpathname, filename),
2500 (nt._isdir, filename),
2501 (os.access, filename, os.R_OK),
2502 (os.chdir, filename),
2503 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002504 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002505 (os.link, filename, filename),
2506 (os.listdir, filename),
2507 (os.lstat, filename),
2508 (os.mkdir, filename),
2509 (os.open, filename, os.O_RDONLY),
2510 (os.rename, filename, filename),
2511 (os.rmdir, filename),
2512 (os.startfile, filename),
2513 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002514 (os.unlink, filename),
2515 (os.utime, filename),
2516 ):
2517 self.assertRaises(DeprecationWarning, func, *args)
2518
Victor Stinner28216442011-11-16 00:34:44 +01002519 @support.skip_unless_symlink
2520 def test_symlink(self):
2521 filename = os.fsencode(support.TESTFN)
2522 with warnings.catch_warnings():
2523 warnings.simplefilter("error", DeprecationWarning)
2524 self.assertRaises(DeprecationWarning,
2525 os.symlink, filename, filename)
2526
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002527
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002528@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2529class TermsizeTests(unittest.TestCase):
2530 def test_does_not_crash(self):
2531 """Check if get_terminal_size() returns a meaningful value.
2532
2533 There's no easy portable way to actually check the size of the
2534 terminal, so let's check if it returns something sensible instead.
2535 """
2536 try:
2537 size = os.get_terminal_size()
2538 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002539 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002540 # Under win32 a generic OSError can be thrown if the
2541 # handle cannot be retrieved
2542 self.skipTest("failed to query terminal size")
2543 raise
2544
Antoine Pitroucfade362012-02-08 23:48:59 +01002545 self.assertGreaterEqual(size.columns, 0)
2546 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002547
2548 def test_stty_match(self):
2549 """Check if stty returns the same results
2550
2551 stty actually tests stdin, so get_terminal_size is invoked on
2552 stdin explicitly. If stty succeeded, then get_terminal_size()
2553 should work too.
2554 """
2555 try:
2556 size = subprocess.check_output(['stty', 'size']).decode().split()
2557 except (FileNotFoundError, subprocess.CalledProcessError):
2558 self.skipTest("stty invocation failed")
2559 expected = (int(size[1]), int(size[0])) # reversed order
2560
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002561 try:
2562 actual = os.get_terminal_size(sys.__stdin__.fileno())
2563 except OSError as e:
2564 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2565 # Under win32 a generic OSError can be thrown if the
2566 # handle cannot be retrieved
2567 self.skipTest("failed to query terminal size")
2568 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002569 self.assertEqual(expected, actual)
2570
2571
Victor Stinner292c8352012-10-30 02:17:38 +01002572class OSErrorTests(unittest.TestCase):
2573 def setUp(self):
2574 class Str(str):
2575 pass
2576
Victor Stinnerafe17062012-10-31 22:47:43 +01002577 self.bytes_filenames = []
2578 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002579 if support.TESTFN_UNENCODABLE is not None:
2580 decoded = support.TESTFN_UNENCODABLE
2581 else:
2582 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002583 self.unicode_filenames.append(decoded)
2584 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002585 if support.TESTFN_UNDECODABLE is not None:
2586 encoded = support.TESTFN_UNDECODABLE
2587 else:
2588 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002589 self.bytes_filenames.append(encoded)
2590 self.bytes_filenames.append(memoryview(encoded))
2591
2592 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002593
2594 def test_oserror_filename(self):
2595 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002596 (self.filenames, os.chdir,),
2597 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002598 (self.filenames, os.lstat,),
2599 (self.filenames, os.open, os.O_RDONLY),
2600 (self.filenames, os.rmdir,),
2601 (self.filenames, os.stat,),
2602 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002603 ]
2604 if sys.platform == "win32":
2605 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002606 (self.bytes_filenames, os.rename, b"dst"),
2607 (self.bytes_filenames, os.replace, b"dst"),
2608 (self.unicode_filenames, os.rename, "dst"),
2609 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002610 # Issue #16414: Don't test undecodable names with listdir()
2611 # because of a Windows bug.
2612 #
2613 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2614 # empty list (instead of failing), whereas os.listdir(b'\xff')
2615 # raises a FileNotFoundError. It looks like a Windows bug:
2616 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2617 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2618 # ERROR_PATH_NOT_FOUND (3).
2619 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002620 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002621 else:
2622 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002623 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002624 (self.filenames, os.rename, "dst"),
2625 (self.filenames, os.replace, "dst"),
2626 ))
2627 if hasattr(os, "chown"):
2628 funcs.append((self.filenames, os.chown, 0, 0))
2629 if hasattr(os, "lchown"):
2630 funcs.append((self.filenames, os.lchown, 0, 0))
2631 if hasattr(os, "truncate"):
2632 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002633 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002634 funcs.append((self.filenames, os.chflags, 0))
2635 if hasattr(os, "lchflags"):
2636 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002637 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002638 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002639 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002640 if sys.platform == "win32":
2641 funcs.append((self.bytes_filenames, os.link, b"dst"))
2642 funcs.append((self.unicode_filenames, os.link, "dst"))
2643 else:
2644 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002645 if hasattr(os, "listxattr"):
2646 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002647 (self.filenames, os.listxattr,),
2648 (self.filenames, os.getxattr, "user.test"),
2649 (self.filenames, os.setxattr, "user.test", b'user'),
2650 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002651 ))
2652 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002653 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002654 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002655 if sys.platform == "win32":
2656 funcs.append((self.unicode_filenames, os.readlink,))
2657 else:
2658 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002659
Victor Stinnerafe17062012-10-31 22:47:43 +01002660 for filenames, func, *func_args in funcs:
2661 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002662 try:
2663 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002664 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002665 self.assertIs(err.filename, name)
2666 else:
2667 self.fail("No exception thrown by {}".format(func))
2668
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002669class CPUCountTests(unittest.TestCase):
2670 def test_cpu_count(self):
2671 cpus = os.cpu_count()
2672 if cpus is not None:
2673 self.assertIsInstance(cpus, int)
2674 self.assertGreater(cpus, 0)
2675 else:
2676 self.skipTest("Could not determine the number of CPUs")
2677
Victor Stinnerdaf45552013-08-28 00:53:59 +02002678
2679class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002680 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002681 fd = os.open(__file__, os.O_RDONLY)
2682 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002683 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002684
Victor Stinnerdaf45552013-08-28 00:53:59 +02002685 os.set_inheritable(fd, True)
2686 self.assertEqual(os.get_inheritable(fd), True)
2687
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002688 @unittest.skipIf(fcntl is None, "need fcntl")
2689 def test_get_inheritable_cloexec(self):
2690 fd = os.open(__file__, os.O_RDONLY)
2691 self.addCleanup(os.close, fd)
2692 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002693
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002694 # clear FD_CLOEXEC flag
2695 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2696 flags &= ~fcntl.FD_CLOEXEC
2697 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002698
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002699 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002700
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002701 @unittest.skipIf(fcntl is None, "need fcntl")
2702 def test_set_inheritable_cloexec(self):
2703 fd = os.open(__file__, os.O_RDONLY)
2704 self.addCleanup(os.close, fd)
2705 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2706 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002707
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002708 os.set_inheritable(fd, True)
2709 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2710 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002711
Victor Stinnerdaf45552013-08-28 00:53:59 +02002712 def test_open(self):
2713 fd = os.open(__file__, os.O_RDONLY)
2714 self.addCleanup(os.close, fd)
2715 self.assertEqual(os.get_inheritable(fd), False)
2716
2717 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2718 def test_pipe(self):
2719 rfd, wfd = os.pipe()
2720 self.addCleanup(os.close, rfd)
2721 self.addCleanup(os.close, wfd)
2722 self.assertEqual(os.get_inheritable(rfd), False)
2723 self.assertEqual(os.get_inheritable(wfd), False)
2724
2725 def test_dup(self):
2726 fd1 = os.open(__file__, os.O_RDONLY)
2727 self.addCleanup(os.close, fd1)
2728
2729 fd2 = os.dup(fd1)
2730 self.addCleanup(os.close, fd2)
2731 self.assertEqual(os.get_inheritable(fd2), False)
2732
2733 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2734 def test_dup2(self):
2735 fd = os.open(__file__, os.O_RDONLY)
2736 self.addCleanup(os.close, fd)
2737
2738 # inheritable by default
2739 fd2 = os.open(__file__, os.O_RDONLY)
2740 try:
2741 os.dup2(fd, fd2)
2742 self.assertEqual(os.get_inheritable(fd2), True)
2743 finally:
2744 os.close(fd2)
2745
2746 # force non-inheritable
2747 fd3 = os.open(__file__, os.O_RDONLY)
2748 try:
2749 os.dup2(fd, fd3, inheritable=False)
2750 self.assertEqual(os.get_inheritable(fd3), False)
2751 finally:
2752 os.close(fd3)
2753
2754 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2755 def test_openpty(self):
2756 master_fd, slave_fd = os.openpty()
2757 self.addCleanup(os.close, master_fd)
2758 self.addCleanup(os.close, slave_fd)
2759 self.assertEqual(os.get_inheritable(master_fd), False)
2760 self.assertEqual(os.get_inheritable(slave_fd), False)
2761
2762
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002763@unittest.skipUnless(hasattr(os, 'get_blocking'),
2764 'needs os.get_blocking() and os.set_blocking()')
2765class BlockingTests(unittest.TestCase):
2766 def test_blocking(self):
2767 fd = os.open(__file__, os.O_RDONLY)
2768 self.addCleanup(os.close, fd)
2769 self.assertEqual(os.get_blocking(fd), True)
2770
2771 os.set_blocking(fd, False)
2772 self.assertEqual(os.get_blocking(fd), False)
2773
2774 os.set_blocking(fd, True)
2775 self.assertEqual(os.get_blocking(fd), True)
2776
2777
Yury Selivanov97e2e062014-09-26 12:33:06 -04002778
2779class ExportsTests(unittest.TestCase):
2780 def test_os_all(self):
2781 self.assertIn('open', os.__all__)
2782 self.assertIn('walk', os.__all__)
2783
2784
Victor Stinner6036e442015-03-08 01:58:04 +01002785class TestScandir(unittest.TestCase):
2786 def setUp(self):
2787 self.path = os.path.realpath(support.TESTFN)
2788 self.addCleanup(support.rmtree, self.path)
2789 os.mkdir(self.path)
2790
2791 def create_file(self, name="file.txt"):
2792 filename = os.path.join(self.path, name)
2793 with open(filename, "wb") as fp:
2794 fp.write(b'python')
2795 return filename
2796
2797 def get_entries(self, names):
2798 entries = dict((entry.name, entry)
2799 for entry in os.scandir(self.path))
2800 self.assertEqual(sorted(entries.keys()), names)
2801 return entries
2802
2803 def assert_stat_equal(self, stat1, stat2, skip_fields):
2804 if skip_fields:
2805 for attr in dir(stat1):
2806 if not attr.startswith("st_"):
2807 continue
2808 if attr in ("st_dev", "st_ino", "st_nlink"):
2809 continue
2810 self.assertEqual(getattr(stat1, attr),
2811 getattr(stat2, attr),
2812 (stat1, stat2, attr))
2813 else:
2814 self.assertEqual(stat1, stat2)
2815
2816 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2817 self.assertEqual(entry.name, name)
2818 self.assertEqual(entry.path, os.path.join(self.path, name))
2819 self.assertEqual(entry.inode(),
2820 os.stat(entry.path, follow_symlinks=False).st_ino)
2821
2822 entry_stat = os.stat(entry.path)
2823 self.assertEqual(entry.is_dir(),
2824 stat.S_ISDIR(entry_stat.st_mode))
2825 self.assertEqual(entry.is_file(),
2826 stat.S_ISREG(entry_stat.st_mode))
2827 self.assertEqual(entry.is_symlink(),
2828 os.path.islink(entry.path))
2829
2830 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2831 self.assertEqual(entry.is_dir(follow_symlinks=False),
2832 stat.S_ISDIR(entry_lstat.st_mode))
2833 self.assertEqual(entry.is_file(follow_symlinks=False),
2834 stat.S_ISREG(entry_lstat.st_mode))
2835
2836 self.assert_stat_equal(entry.stat(),
2837 entry_stat,
2838 os.name == 'nt' and not is_symlink)
2839 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2840 entry_lstat,
2841 os.name == 'nt')
2842
2843 def test_attributes(self):
2844 link = hasattr(os, 'link')
2845 symlink = support.can_symlink()
2846
2847 dirname = os.path.join(self.path, "dir")
2848 os.mkdir(dirname)
2849 filename = self.create_file("file.txt")
2850 if link:
2851 os.link(filename, os.path.join(self.path, "link_file.txt"))
2852 if symlink:
2853 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2854 target_is_directory=True)
2855 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2856
2857 names = ['dir', 'file.txt']
2858 if link:
2859 names.append('link_file.txt')
2860 if symlink:
2861 names.extend(('symlink_dir', 'symlink_file.txt'))
2862 entries = self.get_entries(names)
2863
2864 entry = entries['dir']
2865 self.check_entry(entry, 'dir', True, False, False)
2866
2867 entry = entries['file.txt']
2868 self.check_entry(entry, 'file.txt', False, True, False)
2869
2870 if link:
2871 entry = entries['link_file.txt']
2872 self.check_entry(entry, 'link_file.txt', False, True, False)
2873
2874 if symlink:
2875 entry = entries['symlink_dir']
2876 self.check_entry(entry, 'symlink_dir', True, False, True)
2877
2878 entry = entries['symlink_file.txt']
2879 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2880
2881 def get_entry(self, name):
2882 entries = list(os.scandir(self.path))
2883 self.assertEqual(len(entries), 1)
2884
2885 entry = entries[0]
2886 self.assertEqual(entry.name, name)
2887 return entry
2888
2889 def create_file_entry(self):
2890 filename = self.create_file()
2891 return self.get_entry(os.path.basename(filename))
2892
2893 def test_current_directory(self):
2894 filename = self.create_file()
2895 old_dir = os.getcwd()
2896 try:
2897 os.chdir(self.path)
2898
2899 # call scandir() without parameter: it must list the content
2900 # of the current directory
2901 entries = dict((entry.name, entry) for entry in os.scandir())
2902 self.assertEqual(sorted(entries.keys()),
2903 [os.path.basename(filename)])
2904 finally:
2905 os.chdir(old_dir)
2906
2907 def test_repr(self):
2908 entry = self.create_file_entry()
2909 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2910
2911 def test_removed_dir(self):
2912 path = os.path.join(self.path, 'dir')
2913
2914 os.mkdir(path)
2915 entry = self.get_entry('dir')
2916 os.rmdir(path)
2917
2918 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2919 if os.name == 'nt':
2920 self.assertTrue(entry.is_dir())
2921 self.assertFalse(entry.is_file())
2922 self.assertFalse(entry.is_symlink())
2923 if os.name == 'nt':
2924 self.assertRaises(FileNotFoundError, entry.inode)
2925 # don't fail
2926 entry.stat()
2927 entry.stat(follow_symlinks=False)
2928 else:
2929 self.assertGreater(entry.inode(), 0)
2930 self.assertRaises(FileNotFoundError, entry.stat)
2931 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2932
2933 def test_removed_file(self):
2934 entry = self.create_file_entry()
2935 os.unlink(entry.path)
2936
2937 self.assertFalse(entry.is_dir())
2938 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2939 if os.name == 'nt':
2940 self.assertTrue(entry.is_file())
2941 self.assertFalse(entry.is_symlink())
2942 if os.name == 'nt':
2943 self.assertRaises(FileNotFoundError, entry.inode)
2944 # don't fail
2945 entry.stat()
2946 entry.stat(follow_symlinks=False)
2947 else:
2948 self.assertGreater(entry.inode(), 0)
2949 self.assertRaises(FileNotFoundError, entry.stat)
2950 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2951
2952 def test_broken_symlink(self):
2953 if not support.can_symlink():
2954 return self.skipTest('cannot create symbolic link')
2955
2956 filename = self.create_file("file.txt")
2957 os.symlink(filename,
2958 os.path.join(self.path, "symlink.txt"))
2959 entries = self.get_entries(['file.txt', 'symlink.txt'])
2960 entry = entries['symlink.txt']
2961 os.unlink(filename)
2962
2963 self.assertGreater(entry.inode(), 0)
2964 self.assertFalse(entry.is_dir())
2965 self.assertFalse(entry.is_file()) # broken symlink returns False
2966 self.assertFalse(entry.is_dir(follow_symlinks=False))
2967 self.assertFalse(entry.is_file(follow_symlinks=False))
2968 self.assertTrue(entry.is_symlink())
2969 self.assertRaises(FileNotFoundError, entry.stat)
2970 # don't fail
2971 entry.stat(follow_symlinks=False)
2972
2973 def test_bytes(self):
2974 if os.name == "nt":
2975 # On Windows, os.scandir(bytes) must raise an exception
2976 self.assertRaises(TypeError, os.scandir, b'.')
2977 return
2978
2979 self.create_file("file.txt")
2980
2981 path_bytes = os.fsencode(self.path)
2982 entries = list(os.scandir(path_bytes))
2983 self.assertEqual(len(entries), 1, entries)
2984 entry = entries[0]
2985
2986 self.assertEqual(entry.name, b'file.txt')
2987 self.assertEqual(entry.path,
2988 os.fsencode(os.path.join(self.path, 'file.txt')))
2989
2990 def test_empty_path(self):
2991 self.assertRaises(FileNotFoundError, os.scandir, '')
2992
2993 def test_consume_iterator_twice(self):
2994 self.create_file("file.txt")
2995 iterator = os.scandir(self.path)
2996
2997 entries = list(iterator)
2998 self.assertEqual(len(entries), 1, entries)
2999
3000 # check than consuming the iterator twice doesn't raise exception
3001 entries2 = list(iterator)
3002 self.assertEqual(len(entries2), 0, entries2)
3003
3004 def test_bad_path_type(self):
3005 for obj in [1234, 1.234, {}, []]:
3006 self.assertRaises(TypeError, os.scandir, obj)
3007
3008
Fred Drake2e2be372001-09-20 21:33:42 +00003009if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003010 unittest.main()