blob: bb717cc6614186049c77ca27212d4a0ea141d612 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner138adb82015-06-12 22:01:54 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner138adb82015-06-12 22:01:54 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner138adb82015-06-12 22:01:54 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import platform
19import re
Victor Stinner138adb82015-06-12 22:01:54 +020020import shutil
21import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000022import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import stat
Victor Stinner138adb82015-06-12 22:01:54 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinner138adb82015-06-12 22:01:54 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000032try:
33 import threading
34except ImportError:
35 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import grp
50 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
51 if hasattr(os, 'getgid'):
52 process_gid = os.getgid()
53 if process_gid not in groups:
54 groups.append(process_gid)
55except ImportError:
56 groups = []
57try:
58 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
60except ImportError:
61 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Berker Peksagce643912015-05-06 06:33:17 +030067from test.support.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Thomas Wouters0e3f5912006-08-11 14:57:12 +000085# Tests creating TESTFN
86class FileTests(unittest.TestCase):
87 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000088 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000089 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000090 tearDown = setUp
91
92 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000093 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000095 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096
Christian Heimesfdab48e2008-01-20 09:06:41 +000097 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000098 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
99 # We must allocate two consecutive file descriptors, otherwise
100 # it will mess up other file descriptors (perhaps even the three
101 # standard ones).
102 second = os.dup(first)
103 try:
104 retries = 0
105 while second != first + 1:
106 os.close(first)
107 retries += 1
108 if retries > 10:
109 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000110 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000111 first, second = second, os.dup(second)
112 finally:
113 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000114 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000115 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000116 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000118 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000119 def test_rename(self):
120 path = support.TESTFN
121 old = sys.getrefcount(path)
122 self.assertRaises(TypeError, os.rename, path, 0)
123 new = sys.getrefcount(path)
124 self.assertEqual(old, new)
125
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000126 def test_read(self):
127 with open(support.TESTFN, "w+b") as fobj:
128 fobj.write(b"spam")
129 fobj.flush()
130 fd = fobj.fileno()
131 os.lseek(fd, 0, 0)
132 s = os.read(fd, 4)
133 self.assertEqual(type(s), bytes)
134 self.assertEqual(s, b"spam")
135
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200136 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200137 # Skip the test on 32-bit platforms: the number of bytes must fit in a
138 # Py_ssize_t type
139 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
140 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200141 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
142 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200143 with open(support.TESTFN, "wb") as fp:
144 fp.write(b'test')
145 self.addCleanup(support.unlink, support.TESTFN)
146
147 # Issue #21932: Make sure that os.read() does not raise an
148 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200149 with open(support.TESTFN, "rb") as fp:
150 data = os.read(fp.fileno(), size)
151
152 # The test does not try to read more than 2 GB at once because the
153 # operating system is free to return less bytes than requested.
154 self.assertEqual(data, b'test')
155
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000156 def test_write(self):
157 # os.write() accepts bytes- and buffer-like objects but not strings
158 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
159 self.assertRaises(TypeError, os.write, fd, "beans")
160 os.write(fd, b"bacon\n")
161 os.write(fd, bytearray(b"eggs\n"))
162 os.write(fd, memoryview(b"spam\n"))
163 os.close(fd)
164 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000165 self.assertEqual(fobj.read().splitlines(),
166 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000167
Victor Stinnere0daff12011-03-20 23:36:35 +0100168 def write_windows_console(self, *args):
169 retcode = subprocess.call(args,
170 # use a new console to not flood the test output
171 creationflags=subprocess.CREATE_NEW_CONSOLE,
172 # use a shell to hide the console window (SW_HIDE)
173 shell=True)
174 self.assertEqual(retcode, 0)
175
176 @unittest.skipUnless(sys.platform == 'win32',
177 'test specific to the Windows console')
178 def test_write_windows_console(self):
179 # Issue #11395: the Windows console returns an error (12: not enough
180 # space error) on writing into stdout if stdout mode is binary and the
181 # length is greater than 66,000 bytes (or less, depending on heap
182 # usage).
183 code = "print('x' * 100000)"
184 self.write_windows_console(sys.executable, "-c", code)
185 self.write_windows_console(sys.executable, "-u", "-c", code)
186
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000187 def fdopen_helper(self, *args):
188 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200189 f = os.fdopen(fd, *args)
190 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000191
192 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200193 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
194 os.close(fd)
195
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000196 self.fdopen_helper()
197 self.fdopen_helper('r')
198 self.fdopen_helper('r', 100)
199
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100200 def test_replace(self):
201 TESTFN2 = support.TESTFN + ".2"
202 with open(support.TESTFN, 'w') as f:
203 f.write("1")
204 with open(TESTFN2, 'w') as f:
205 f.write("2")
206 self.addCleanup(os.unlink, TESTFN2)
207 os.replace(support.TESTFN, TESTFN2)
208 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
209 with open(TESTFN2, 'r') as f:
210 self.assertEqual(f.read(), "1")
211
Martin Panterbf19d162015-09-09 01:01:13 +0000212 def test_open_keywords(self):
213 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
214 dir_fd=None)
215 os.close(f)
216
217 def test_symlink_keywords(self):
218 symlink = support.get_attribute(os, "symlink")
219 try:
220 symlink(src='target', dst=support.TESTFN,
221 target_is_directory=False, dir_fd=None)
222 except (NotImplementedError, OSError):
223 pass # No OS support or unprivileged user
224
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200225
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000226# Test attributes on return values from os.*stat* family.
227class StatAttributeTests(unittest.TestCase):
228 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 os.mkdir(support.TESTFN)
230 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000231 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000232 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000233 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000234
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235 def tearDown(self):
236 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000237 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000238
Serhiy Storchaka43767632013-11-03 21:31:38 +0200239 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000240 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000241 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242
243 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(result[stat.ST_SIZE], 3)
245 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000246
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000247 # Make sure all the attributes are there
248 members = dir(result)
249 for name in dir(stat):
250 if name[:3] == 'ST_':
251 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000252 if name.endswith("TIME"):
253 def trunc(x): return int(x)
254 else:
255 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259
Larry Hastings6fe20b32012-04-19 15:07:49 -0700260 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700261 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700262 for name in 'st_atime st_mtime st_ctime'.split():
263 floaty = int(getattr(result, name) * 100000)
264 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700265 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700266
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267 try:
268 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200269 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270 except IndexError:
271 pass
272
273 # Make sure that assignment fails
274 try:
275 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200276 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000277 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278 pass
279
280 try:
281 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200282 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000283 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 pass
285
286 try:
287 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200288 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000289 except AttributeError:
290 pass
291
292 # Use the stat_result constructor with a too-short tuple.
293 try:
294 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200295 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000296 except TypeError:
297 pass
298
Ezio Melotti42da6632011-03-15 05:18:48 +0200299 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000300 try:
301 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
302 except TypeError:
303 pass
304
Antoine Pitrou38425292010-09-21 18:19:07 +0000305 def test_stat_attributes(self):
306 self.check_stat_attributes(self.fname)
307
308 def test_stat_attributes_bytes(self):
309 try:
310 fname = self.fname.encode(sys.getfilesystemencoding())
311 except UnicodeEncodeError:
312 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100313 with warnings.catch_warnings():
314 warnings.simplefilter("ignore", DeprecationWarning)
315 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000316
Christian Heimes25827622013-10-12 01:27:08 +0200317 def test_stat_result_pickle(self):
318 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200319 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
320 p = pickle.dumps(result, proto)
321 self.assertIn(b'stat_result', p)
322 if proto < 4:
323 self.assertIn(b'cos\nstat_result\n', p)
324 unpickled = pickle.loads(p)
325 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000328 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000329 try:
330 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000331 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000332 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000333 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200334 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000335
336 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000337 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000338
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000339 # Make sure all the attributes are there.
340 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
341 'ffree', 'favail', 'flag', 'namemax')
342 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000343 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344
345 # Make sure that assignment really fails
346 try:
347 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200348 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000349 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350 pass
351
352 try:
353 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200354 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000355 except AttributeError:
356 pass
357
358 # Use the constructor with a too-short tuple.
359 try:
360 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200361 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000362 except TypeError:
363 pass
364
Ezio Melotti42da6632011-03-15 05:18:48 +0200365 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 try:
367 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
368 except TypeError:
369 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000370
Christian Heimes25827622013-10-12 01:27:08 +0200371 @unittest.skipUnless(hasattr(os, 'statvfs'),
372 "need os.statvfs()")
373 def test_statvfs_result_pickle(self):
374 try:
375 result = os.statvfs(self.fname)
376 except OSError as e:
377 # On AtheOS, glibc always returns ENOSYS
378 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200379 self.skipTest('os.statvfs() failed with ENOSYS')
380
Serhiy Storchakabad12572014-12-15 14:03:42 +0200381 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
382 p = pickle.dumps(result, proto)
383 self.assertIn(b'statvfs_result', p)
384 if proto < 4:
385 self.assertIn(b'cos\nstatvfs_result\n', p)
386 unpickled = pickle.loads(p)
387 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200388
Serhiy Storchaka43767632013-11-03 21:31:38 +0200389 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
390 def test_1686475(self):
391 # Verify that an open file can be stat'ed
392 try:
393 os.stat(r"c:\pagefile.sys")
394 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600395 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200396 except OSError as e:
397 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000398
Serhiy Storchaka43767632013-11-03 21:31:38 +0200399 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
400 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
401 def test_15261(self):
402 # Verify that stat'ing a closed fd does not cause crash
403 r, w = os.pipe()
404 try:
405 os.stat(r) # should not raise error
406 finally:
407 os.close(r)
408 os.close(w)
409 with self.assertRaises(OSError) as ctx:
410 os.stat(r)
411 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100412
Zachary Ware63f277b2014-06-19 09:46:37 -0500413 def check_file_attributes(self, result):
414 self.assertTrue(hasattr(result, 'st_file_attributes'))
415 self.assertTrue(isinstance(result.st_file_attributes, int))
416 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
417
418 @unittest.skipUnless(sys.platform == "win32",
419 "st_file_attributes is Win32 specific")
420 def test_file_attributes(self):
421 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
422 result = os.stat(self.fname)
423 self.check_file_attributes(result)
424 self.assertEqual(
425 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
426 0)
427
428 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
429 result = os.stat(support.TESTFN)
430 self.check_file_attributes(result)
431 self.assertEqual(
432 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
433 stat.FILE_ATTRIBUTE_DIRECTORY)
434
Victor Stinner138adb82015-06-12 22:01:54 +0200435
436class UtimeTests(unittest.TestCase):
437 def setUp(self):
438 self.dirname = support.TESTFN
439 self.fname = os.path.join(self.dirname, "f1")
440
441 self.addCleanup(support.rmtree, self.dirname)
442 os.mkdir(self.dirname)
443 with open(self.fname, 'wb') as fp:
444 fp.write(b"ABC")
445
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200446 def restore_float_times(state):
447 with warnings.catch_warnings():
448 warnings.simplefilter("ignore", DeprecationWarning)
449
450 os.stat_float_times(state)
451
Victor Stinner138adb82015-06-12 22:01:54 +0200452 # ensure that st_atime and st_mtime are float
453 with warnings.catch_warnings():
454 warnings.simplefilter("ignore", DeprecationWarning)
455
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200456 old_float_times = os.stat_float_times(-1)
457 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner138adb82015-06-12 22:01:54 +0200458
459 os.stat_float_times(True)
460
461 def support_subsecond(self, filename):
462 # Heuristic to check if the filesystem supports timestamp with
463 # subsecond resolution: check if float and int timestamps are different
464 st = os.stat(filename)
465 return ((st.st_atime != st[7])
466 or (st.st_mtime != st[8])
467 or (st.st_ctime != st[9]))
468
469 def _test_utime(self, set_time, filename=None):
470 if not filename:
471 filename = self.fname
472
473 support_subsecond = self.support_subsecond(filename)
474 if support_subsecond:
475 # Timestamp with a resolution of 1 microsecond (10^-6).
476 #
477 # The resolution of the C internal function used by os.utime()
478 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
479 # test with a resolution of 1 ns requires more work:
480 # see the issue #15745.
481 atime_ns = 1002003000 # 1.002003 seconds
482 mtime_ns = 4005006000 # 4.005006 seconds
483 else:
484 # use a resolution of 1 second
485 atime_ns = 5 * 10**9
486 mtime_ns = 8 * 10**9
487
488 set_time(filename, (atime_ns, mtime_ns))
489 st = os.stat(filename)
490
491 if support_subsecond:
492 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
493 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
494 else:
495 self.assertEqual(st.st_atime, atime_ns * 1e-9)
496 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
497 self.assertEqual(st.st_atime_ns, atime_ns)
498 self.assertEqual(st.st_mtime_ns, mtime_ns)
499
500 def test_utime(self):
501 def set_time(filename, ns):
502 # test the ns keyword parameter
503 os.utime(filename, ns=ns)
504 self._test_utime(set_time)
505
506 @staticmethod
507 def ns_to_sec(ns):
508 # Convert a number of nanosecond (int) to a number of seconds (float).
509 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
510 # issue, os.utime() rounds towards minus infinity.
511 return (ns * 1e-9) + 0.5e-9
512
513 def test_utime_by_indexed(self):
514 # pass times as floating point seconds as the second indexed parameter
515 def set_time(filename, ns):
516 atime_ns, mtime_ns = ns
517 atime = self.ns_to_sec(atime_ns)
518 mtime = self.ns_to_sec(mtime_ns)
519 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
520 # or utime(time_t)
521 os.utime(filename, (atime, mtime))
522 self._test_utime(set_time)
523
524 def test_utime_by_times(self):
525 def set_time(filename, ns):
526 atime_ns, mtime_ns = ns
527 atime = self.ns_to_sec(atime_ns)
528 mtime = self.ns_to_sec(mtime_ns)
529 # test the times keyword parameter
530 os.utime(filename, times=(atime, mtime))
531 self._test_utime(set_time)
532
533 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
534 "follow_symlinks support for utime required "
535 "for this test.")
536 def test_utime_nofollow_symlinks(self):
537 def set_time(filename, ns):
538 # use follow_symlinks=False to test utimensat(timespec)
539 # or lutimes(timeval)
540 os.utime(filename, ns=ns, follow_symlinks=False)
541 self._test_utime(set_time)
542
543 @unittest.skipUnless(os.utime in os.supports_fd,
544 "fd support for utime required for this test.")
545 def test_utime_fd(self):
546 def set_time(filename, ns):
547 with open(filename, 'wb') as fp:
548 # use a file descriptor to test futimens(timespec)
549 # or futimes(timeval)
550 os.utime(fp.fileno(), ns=ns)
551 self._test_utime(set_time)
552
553 @unittest.skipUnless(os.utime in os.supports_dir_fd,
554 "dir_fd support for utime required for this test.")
555 def test_utime_dir_fd(self):
556 def set_time(filename, ns):
557 dirname, name = os.path.split(filename)
558 dirfd = os.open(dirname, os.O_RDONLY)
559 try:
560 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
561 os.utime(name, dir_fd=dirfd, ns=ns)
562 finally:
563 os.close(dirfd)
564 self._test_utime(set_time)
565
566 def test_utime_directory(self):
567 def set_time(filename, ns):
568 # test calling os.utime() on a directory
569 os.utime(filename, ns=ns)
570 self._test_utime(set_time, filename=self.dirname)
571
572 def _test_utime_current(self, set_time):
573 # Get the system clock
574 current = time.time()
575
576 # Call os.utime() to set the timestamp to the current system clock
577 set_time(self.fname)
578
579 if not self.support_subsecond(self.fname):
580 delta = 1.0
581 else:
582 # On Windows, the usual resolution of time.time() is 15.6 ms
583 delta = 0.020
584 st = os.stat(self.fname)
585 msg = ("st_time=%r, current=%r, dt=%r"
586 % (st.st_mtime, current, st.st_mtime - current))
587 self.assertAlmostEqual(st.st_mtime, current,
588 delta=delta, msg=msg)
589
590 def test_utime_current(self):
591 def set_time(filename):
592 # Set to the current time in the new way
593 os.utime(self.fname)
594 self._test_utime_current(set_time)
595
596 def test_utime_current_old(self):
597 def set_time(filename):
598 # Set to the current time in the old explicit way.
599 os.utime(self.fname, None)
600 self._test_utime_current(set_time)
601
602 def get_file_system(self, path):
603 if sys.platform == 'win32':
604 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
605 import ctypes
606 kernel32 = ctypes.windll.kernel32
607 buf = ctypes.create_unicode_buffer("", 100)
608 ok = kernel32.GetVolumeInformationW(root, None, 0,
609 None, None, None,
610 buf, len(buf))
611 if ok:
612 return buf.value
613 # return None if the filesystem is unknown
614
615 def test_large_time(self):
616 # Many filesystems are limited to the year 2038. At least, the test
617 # pass with NTFS filesystem.
618 if self.get_file_system(self.dirname) != "NTFS":
619 self.skipTest("requires NTFS")
620
621 large = 5000000000 # some day in 2128
622 os.utime(self.fname, (large, large))
623 self.assertEqual(os.stat(self.fname).st_mtime, large)
624
625 def test_utime_invalid_arguments(self):
626 # seconds and nanoseconds parameters are mutually exclusive
627 with self.assertRaises(ValueError):
628 os.utime(self.fname, (5, 5), ns=(5, 5))
629
630
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000631from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000632
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000633class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000634 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000635 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000636
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000637 def setUp(self):
638 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000639 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000640 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000641 for key, value in self._reference().items():
642 os.environ[key] = value
643
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000644 def tearDown(self):
645 os.environ.clear()
646 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000647 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000648 os.environb.clear()
649 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000650
Christian Heimes90333392007-11-01 19:08:42 +0000651 def _reference(self):
652 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
653
654 def _empty_mapping(self):
655 os.environ.clear()
656 return os.environ
657
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000658 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300659 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000660 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000661 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300662 os.environ.update(HELLO="World")
663 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
664 value = popen.read().strip()
665 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000666
Ezio Melottic7e139b2012-09-26 20:01:34 +0300667 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000668 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300669 with os.popen(
670 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
671 it = iter(popen)
672 self.assertEqual(next(it), "line1\n")
673 self.assertEqual(next(it), "line2\n")
674 self.assertEqual(next(it), "line3\n")
675 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000676
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000677 # Verify environ keys and values from the OS are of the
678 # correct str type.
679 def test_keyvalue_types(self):
680 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000681 self.assertEqual(type(key), str)
682 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000683
Christian Heimes90333392007-11-01 19:08:42 +0000684 def test_items(self):
685 for key, value in self._reference().items():
686 self.assertEqual(os.environ.get(key), value)
687
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000688 # Issue 7310
689 def test___repr__(self):
690 """Check that the repr() of os.environ looks like environ({...})."""
691 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000692 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
693 '{!r}: {!r}'.format(key, value)
694 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000695
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000696 def test_get_exec_path(self):
697 defpath_list = os.defpath.split(os.pathsep)
698 test_path = ['/monty', '/python', '', '/flying/circus']
699 test_env = {'PATH': os.pathsep.join(test_path)}
700
701 saved_environ = os.environ
702 try:
703 os.environ = dict(test_env)
704 # Test that defaulting to os.environ works.
705 self.assertSequenceEqual(test_path, os.get_exec_path())
706 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
707 finally:
708 os.environ = saved_environ
709
710 # No PATH environment variable
711 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
712 # Empty PATH environment variable
713 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
714 # Supplied PATH environment variable
715 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
716
Victor Stinnerb745a742010-05-18 17:17:23 +0000717 if os.supports_bytes_environ:
718 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000719 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000720 # ignore BytesWarning warning
721 with warnings.catch_warnings(record=True):
722 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000723 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000724 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000725 pass
726 else:
727 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000728
729 # bytes key and/or value
730 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
731 ['abc'])
732 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
733 ['abc'])
734 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
735 ['abc'])
736
737 @unittest.skipUnless(os.supports_bytes_environ,
738 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000739 def test_environb(self):
740 # os.environ -> os.environb
741 value = 'euro\u20ac'
742 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000743 value_bytes = value.encode(sys.getfilesystemencoding(),
744 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000745 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000746 msg = "U+20AC character is not encodable to %s" % (
747 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000748 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000749 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000750 self.assertEqual(os.environ['unicode'], value)
751 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000752
753 # os.environb -> os.environ
754 value = b'\xff'
755 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000756 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000757 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000758 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000759
Charles-François Natali2966f102011-11-26 11:32:46 +0100760 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
761 # #13415).
762 @support.requires_freebsd_version(7)
763 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100764 def test_unset_error(self):
765 if sys.platform == "win32":
766 # an environment variable is limited to 32,767 characters
767 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100768 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100769 else:
770 # "=" is not allowed in a variable name
771 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100772 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100773
Victor Stinner6d101392013-04-14 16:35:04 +0200774 def test_key_type(self):
775 missing = 'missingkey'
776 self.assertNotIn(missing, os.environ)
777
Victor Stinner839e5ea2013-04-14 16:43:03 +0200778 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200779 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200780 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200781 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200782
Victor Stinner839e5ea2013-04-14 16:43:03 +0200783 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200784 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200785 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200786 self.assertTrue(cm.exception.__suppress_context__)
787
Victor Stinner6d101392013-04-14 16:35:04 +0200788
Tim Petersc4e09402003-04-25 07:11:48 +0000789class WalkTests(unittest.TestCase):
790 """Tests for os.walk()."""
791
Victor Stinner0561c532015-03-12 10:28:24 +0100792 # Wrapper to hide minor differences between os.walk and os.fwalk
793 # to tests both functions with the same code base
794 def walk(self, directory, topdown=True, follow_symlinks=False):
795 walk_it = os.walk(directory,
796 topdown=topdown,
797 followlinks=follow_symlinks)
798 for root, dirs, files in walk_it:
799 yield (root, dirs, files)
800
Charles-François Natali7372b062012-02-05 15:15:38 +0100801 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100802 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000803
804 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000805 # TESTFN/
806 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000807 # tmp1
808 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000809 # tmp2
810 # SUB11/ no kids
811 # SUB2/ a file kid and a dirsymlink kid
812 # tmp3
813 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200814 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000815 # TEST2/
816 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100817 self.walk_path = join(support.TESTFN, "TEST1")
818 self.sub1_path = join(self.walk_path, "SUB1")
819 self.sub11_path = join(self.sub1_path, "SUB11")
820 sub2_path = join(self.walk_path, "SUB2")
821 tmp1_path = join(self.walk_path, "tmp1")
822 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000823 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100824 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000825 t2_path = join(support.TESTFN, "TEST2")
826 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200827 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000828
829 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100830 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000831 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000832 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100833
Guido van Rossumd8faa362007-04-27 19:54:29 +0000834 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000835 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000836 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
837 f.close()
838
Victor Stinner0561c532015-03-12 10:28:24 +0100839 if support.can_symlink():
840 os.symlink(os.path.abspath(t2_path), self.link_path)
841 os.symlink('broken', broken_link_path, True)
842 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
843 else:
844 self.sub2_tree = (sub2_path, [], ["tmp3"])
845
846 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000847 # Walk top-down.
Victor Stinner0561c532015-03-12 10:28:24 +0100848 all = list(os.walk(self.walk_path))
849
Tim Petersc4e09402003-04-25 07:11:48 +0000850 self.assertEqual(len(all), 4)
851 # We can't know which order SUB1 and SUB2 will appear in.
852 # Not flipped: TESTFN, SUB1, SUB11, SUB2
853 # flipped: TESTFN, SUB2, SUB1, SUB11
854 flipped = all[0][1][0] != "SUB1"
855 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200856 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100857 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
858 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
859 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
860 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000861
Victor Stinner0561c532015-03-12 10:28:24 +0100862 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000863 # Prune the search.
864 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100865 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000866 all.append((root, dirs, files))
867 # Don't descend into SUB1.
868 if 'SUB1' in dirs:
869 # Note that this also mutates the dirs we appended to all!
870 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000871
Victor Stinner0561c532015-03-12 10:28:24 +0100872 self.assertEqual(len(all), 2)
873 self.assertEqual(all[0],
874 (self.walk_path, ["SUB2"], ["tmp1"]))
875
876 all[1][-1].sort()
877 self.assertEqual(all[1], self.sub2_tree)
878
879 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000880 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100881 all = list(self.walk(self.walk_path, topdown=False))
882
Tim Petersc4e09402003-04-25 07:11:48 +0000883 self.assertEqual(len(all), 4)
884 # We can't know which order SUB1 and SUB2 will appear in.
885 # Not flipped: SUB11, SUB1, SUB2, TESTFN
886 # flipped: SUB2, SUB11, SUB1, TESTFN
887 flipped = all[3][1][0] != "SUB1"
888 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200889 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100890 self.assertEqual(all[3],
891 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
892 self.assertEqual(all[flipped],
893 (self.sub11_path, [], []))
894 self.assertEqual(all[flipped + 1],
895 (self.sub1_path, ["SUB11"], ["tmp2"]))
896 self.assertEqual(all[2 - 2 * flipped],
897 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000898
Victor Stinner0561c532015-03-12 10:28:24 +0100899 def test_walk_symlink(self):
900 if not support.can_symlink():
901 self.skipTest("need symlink support")
902
903 # Walk, following symlinks.
904 walk_it = self.walk(self.walk_path, follow_symlinks=True)
905 for root, dirs, files in walk_it:
906 if root == self.link_path:
907 self.assertEqual(dirs, [])
908 self.assertEqual(files, ["tmp4"])
909 break
910 else:
911 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000912
913 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000914 # Tear everything down. This is a decent use for bottom-up on
915 # Windows, which doesn't have a recursive delete command. The
916 # (not so) subtlety is that rmdir will fail unless the dir's
917 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000918 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000919 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000920 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000921 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000922 dirname = os.path.join(root, name)
923 if not os.path.islink(dirname):
924 os.rmdir(dirname)
925 else:
926 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000927 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000928
Charles-François Natali7372b062012-02-05 15:15:38 +0100929
930@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
931class FwalkTests(WalkTests):
932 """Tests for os.fwalk()."""
933
Victor Stinner0561c532015-03-12 10:28:24 +0100934 def walk(self, directory, topdown=True, follow_symlinks=False):
935 walk_it = os.fwalk(directory,
936 topdown=topdown,
937 follow_symlinks=follow_symlinks)
938 for root, dirs, files, root_fd in walk_it:
939 yield (root, dirs, files)
940
941
Larry Hastingsc48fe982012-06-25 04:49:05 -0700942 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
943 """
944 compare with walk() results.
945 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700946 walk_kwargs = walk_kwargs.copy()
947 fwalk_kwargs = fwalk_kwargs.copy()
948 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
949 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
950 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700951
Charles-François Natali7372b062012-02-05 15:15:38 +0100952 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700953 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100954 expected[root] = (set(dirs), set(files))
955
Larry Hastingsc48fe982012-06-25 04:49:05 -0700956 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100957 self.assertIn(root, expected)
958 self.assertEqual(expected[root], (set(dirs), set(files)))
959
Larry Hastingsc48fe982012-06-25 04:49:05 -0700960 def test_compare_to_walk(self):
961 kwargs = {'top': support.TESTFN}
962 self._compare_to_walk(kwargs, kwargs)
963
Charles-François Natali7372b062012-02-05 15:15:38 +0100964 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700965 try:
966 fd = os.open(".", os.O_RDONLY)
967 walk_kwargs = {'top': support.TESTFN}
968 fwalk_kwargs = walk_kwargs.copy()
969 fwalk_kwargs['dir_fd'] = fd
970 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
971 finally:
972 os.close(fd)
973
974 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100975 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700976 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
977 args = support.TESTFN, topdown, None
978 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100979 # check that the FD is valid
980 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700981 # redundant check
982 os.stat(rootfd)
983 # check that listdir() returns consistent information
984 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100985
986 def test_fd_leak(self):
987 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
988 # we both check that calling fwalk() a large number of times doesn't
989 # yield EMFILE, and that the minimum allocated FD hasn't changed.
990 minfd = os.dup(1)
991 os.close(minfd)
992 for i in range(256):
993 for x in os.fwalk(support.TESTFN):
994 pass
995 newfd = os.dup(1)
996 self.addCleanup(os.close, newfd)
997 self.assertEqual(newfd, minfd)
998
999 def tearDown(self):
1000 # cleanup
1001 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
1002 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001003 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001004 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001005 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -07001006 if stat.S_ISDIR(st.st_mode):
1007 os.rmdir(name, dir_fd=rootfd)
1008 else:
1009 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +01001010 os.rmdir(support.TESTFN)
1011
1012
Guido van Rossume7ba4952007-06-06 23:52:48 +00001013class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001014 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001015 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001016
1017 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001018 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001019 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1020 os.makedirs(path) # Should work
1021 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1022 os.makedirs(path)
1023
1024 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001025 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001026 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1027 os.makedirs(path)
1028 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1029 'dir5', 'dir6')
1030 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001031
Terry Reedy5a22b652010-12-02 07:05:56 +00001032 def test_exist_ok_existing_directory(self):
1033 path = os.path.join(support.TESTFN, 'dir1')
1034 mode = 0o777
1035 old_mask = os.umask(0o022)
1036 os.makedirs(path, mode)
1037 self.assertRaises(OSError, os.makedirs, path, mode)
1038 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001039 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001040 os.makedirs(path, mode=mode, exist_ok=True)
1041 os.umask(old_mask)
1042
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001043 def test_exist_ok_s_isgid_directory(self):
1044 path = os.path.join(support.TESTFN, 'dir1')
1045 S_ISGID = stat.S_ISGID
1046 mode = 0o777
1047 old_mask = os.umask(0o022)
1048 try:
1049 existing_testfn_mode = stat.S_IMODE(
1050 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001051 try:
1052 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001053 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001054 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001055 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1056 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1057 # The os should apply S_ISGID from the parent dir for us, but
1058 # this test need not depend on that behavior. Be explicit.
1059 os.makedirs(path, mode | S_ISGID)
1060 # http://bugs.python.org/issue14992
1061 # Should not fail when the bit is already set.
1062 os.makedirs(path, mode, exist_ok=True)
1063 # remove the bit.
1064 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001065 # May work even when the bit is not already set when demanded.
1066 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001067 finally:
1068 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001069
1070 def test_exist_ok_existing_regular_file(self):
1071 base = support.TESTFN
1072 path = os.path.join(support.TESTFN, 'dir1')
1073 f = open(path, 'w')
1074 f.write('abc')
1075 f.close()
1076 self.assertRaises(OSError, os.makedirs, path)
1077 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1078 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1079 os.remove(path)
1080
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001081 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001082 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001083 'dir4', 'dir5', 'dir6')
1084 # If the tests failed, the bottom-most directory ('../dir6')
1085 # may not have been created, so we look for the outermost directory
1086 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001087 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001088 path = os.path.dirname(path)
1089
1090 os.removedirs(path)
1091
Andrew Svetlov405faed2012-12-25 12:18:09 +02001092
R David Murrayf2ad1732014-12-25 18:36:56 -05001093@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1094class ChownFileTests(unittest.TestCase):
1095
Berker Peksag036a71b2015-07-21 09:29:48 +03001096 @classmethod
1097 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001098 os.mkdir(support.TESTFN)
1099
1100 def test_chown_uid_gid_arguments_must_be_index(self):
1101 stat = os.stat(support.TESTFN)
1102 uid = stat.st_uid
1103 gid = stat.st_gid
1104 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1105 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1106 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1107 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1108 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1109
1110 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1111 def test_chown(self):
1112 gid_1, gid_2 = groups[:2]
1113 uid = os.stat(support.TESTFN).st_uid
1114 os.chown(support.TESTFN, uid, gid_1)
1115 gid = os.stat(support.TESTFN).st_gid
1116 self.assertEqual(gid, gid_1)
1117 os.chown(support.TESTFN, uid, gid_2)
1118 gid = os.stat(support.TESTFN).st_gid
1119 self.assertEqual(gid, gid_2)
1120
1121 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1122 "test needs root privilege and more than one user")
1123 def test_chown_with_root(self):
1124 uid_1, uid_2 = all_users[:2]
1125 gid = os.stat(support.TESTFN).st_gid
1126 os.chown(support.TESTFN, uid_1, gid)
1127 uid = os.stat(support.TESTFN).st_uid
1128 self.assertEqual(uid, uid_1)
1129 os.chown(support.TESTFN, uid_2, gid)
1130 uid = os.stat(support.TESTFN).st_uid
1131 self.assertEqual(uid, uid_2)
1132
1133 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1134 "test needs non-root account and more than one user")
1135 def test_chown_without_permission(self):
1136 uid_1, uid_2 = all_users[:2]
1137 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001138 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001139 os.chown(support.TESTFN, uid_1, gid)
1140 os.chown(support.TESTFN, uid_2, gid)
1141
Berker Peksag036a71b2015-07-21 09:29:48 +03001142 @classmethod
1143 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001144 os.rmdir(support.TESTFN)
1145
1146
Andrew Svetlov405faed2012-12-25 12:18:09 +02001147class RemoveDirsTests(unittest.TestCase):
1148 def setUp(self):
1149 os.makedirs(support.TESTFN)
1150
1151 def tearDown(self):
1152 support.rmtree(support.TESTFN)
1153
1154 def test_remove_all(self):
1155 dira = os.path.join(support.TESTFN, 'dira')
1156 os.mkdir(dira)
1157 dirb = os.path.join(dira, 'dirb')
1158 os.mkdir(dirb)
1159 os.removedirs(dirb)
1160 self.assertFalse(os.path.exists(dirb))
1161 self.assertFalse(os.path.exists(dira))
1162 self.assertFalse(os.path.exists(support.TESTFN))
1163
1164 def test_remove_partial(self):
1165 dira = os.path.join(support.TESTFN, 'dira')
1166 os.mkdir(dira)
1167 dirb = os.path.join(dira, 'dirb')
1168 os.mkdir(dirb)
1169 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1170 f.write('text')
1171 os.removedirs(dirb)
1172 self.assertFalse(os.path.exists(dirb))
1173 self.assertTrue(os.path.exists(dira))
1174 self.assertTrue(os.path.exists(support.TESTFN))
1175
1176 def test_remove_nothing(self):
1177 dira = os.path.join(support.TESTFN, 'dira')
1178 os.mkdir(dira)
1179 dirb = os.path.join(dira, 'dirb')
1180 os.mkdir(dirb)
1181 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1182 f.write('text')
1183 with self.assertRaises(OSError):
1184 os.removedirs(dirb)
1185 self.assertTrue(os.path.exists(dirb))
1186 self.assertTrue(os.path.exists(dira))
1187 self.assertTrue(os.path.exists(support.TESTFN))
1188
1189
Guido van Rossume7ba4952007-06-06 23:52:48 +00001190class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001191 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001192 with open(os.devnull, 'wb') as f:
1193 f.write(b'hello')
1194 f.close()
1195 with open(os.devnull, 'rb') as f:
1196 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001197
Andrew Svetlov405faed2012-12-25 12:18:09 +02001198
Guido van Rossume7ba4952007-06-06 23:52:48 +00001199class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001200 def test_urandom_length(self):
1201 self.assertEqual(len(os.urandom(0)), 0)
1202 self.assertEqual(len(os.urandom(1)), 1)
1203 self.assertEqual(len(os.urandom(10)), 10)
1204 self.assertEqual(len(os.urandom(100)), 100)
1205 self.assertEqual(len(os.urandom(1000)), 1000)
1206
1207 def test_urandom_value(self):
1208 data1 = os.urandom(16)
1209 data2 = os.urandom(16)
1210 self.assertNotEqual(data1, data2)
1211
1212 def get_urandom_subprocess(self, count):
1213 code = '\n'.join((
1214 'import os, sys',
1215 'data = os.urandom(%s)' % count,
1216 'sys.stdout.buffer.write(data)',
1217 'sys.stdout.buffer.flush()'))
1218 out = assert_python_ok('-c', code)
1219 stdout = out[1]
1220 self.assertEqual(len(stdout), 16)
1221 return stdout
1222
1223 def test_urandom_subprocess(self):
1224 data1 = self.get_urandom_subprocess(16)
1225 data2 = self.get_urandom_subprocess(16)
1226 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001227
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001228
1229HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
Victor Stinner9eb57c52015-03-19 22:21:49 +01001230HAVE_GETRANDOM = (sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001231
1232@unittest.skipIf(HAVE_GETENTROPY,
1233 "getentropy() does not use a file descriptor")
Victor Stinner9eb57c52015-03-19 22:21:49 +01001234@unittest.skipIf(HAVE_GETRANDOM,
1235 "getrandom() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001236class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001237 @unittest.skipUnless(resource, "test requires the resource module")
1238 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001239 # Check urandom() failing when it is not able to open /dev/random.
1240 # We spawn a new process to make the test more robust (if getrlimit()
1241 # failed to restore the file descriptor limit after this, the whole
1242 # test suite would crash; this actually happened on the OS X Tiger
1243 # buildbot).
1244 code = """if 1:
1245 import errno
1246 import os
1247 import resource
1248
1249 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1250 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1251 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001252 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001253 except OSError as e:
1254 assert e.errno == errno.EMFILE, e.errno
1255 else:
1256 raise AssertionError("OSError not raised")
1257 """
1258 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001259
Antoine Pitroue472aea2014-04-26 14:33:03 +02001260 def test_urandom_fd_closed(self):
1261 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1262 # closed.
1263 code = """if 1:
1264 import os
1265 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001266 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001267 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001268 with test.support.SuppressCrashReport():
1269 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001270 sys.stdout.buffer.write(os.urandom(4))
1271 """
1272 rc, out, err = assert_python_ok('-Sc', code)
1273
1274 def test_urandom_fd_reopened(self):
1275 # Issue #21207: urandom() should detect its fd to /dev/urandom
1276 # changed to something else, and reopen it.
1277 with open(support.TESTFN, 'wb') as f:
1278 f.write(b"x" * 256)
1279 self.addCleanup(os.unlink, support.TESTFN)
1280 code = """if 1:
1281 import os
1282 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001283 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001284 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001285 with test.support.SuppressCrashReport():
1286 for fd in range(3, 256):
1287 try:
1288 os.close(fd)
1289 except OSError:
1290 pass
1291 else:
1292 # Found the urandom fd (XXX hopefully)
1293 break
1294 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001295 with open({TESTFN!r}, 'rb') as f:
1296 os.dup2(f.fileno(), fd)
1297 sys.stdout.buffer.write(os.urandom(4))
1298 sys.stdout.buffer.write(os.urandom(4))
1299 """.format(TESTFN=support.TESTFN)
1300 rc, out, err = assert_python_ok('-Sc', code)
1301 self.assertEqual(len(out), 8)
1302 self.assertNotEqual(out[0:4], out[4:8])
1303 rc, out2, err2 = assert_python_ok('-Sc', code)
1304 self.assertEqual(len(out2), 8)
1305 self.assertNotEqual(out2, out)
1306
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001307
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001308@contextlib.contextmanager
1309def _execvpe_mockup(defpath=None):
1310 """
1311 Stubs out execv and execve functions when used as context manager.
1312 Records exec calls. The mock execv and execve functions always raise an
1313 exception as they would normally never return.
1314 """
1315 # A list of tuples containing (function name, first arg, args)
1316 # of calls to execv or execve that have been made.
1317 calls = []
1318
1319 def mock_execv(name, *args):
1320 calls.append(('execv', name, args))
1321 raise RuntimeError("execv called")
1322
1323 def mock_execve(name, *args):
1324 calls.append(('execve', name, args))
1325 raise OSError(errno.ENOTDIR, "execve called")
1326
1327 try:
1328 orig_execv = os.execv
1329 orig_execve = os.execve
1330 orig_defpath = os.defpath
1331 os.execv = mock_execv
1332 os.execve = mock_execve
1333 if defpath is not None:
1334 os.defpath = defpath
1335 yield calls
1336 finally:
1337 os.execv = orig_execv
1338 os.execve = orig_execve
1339 os.defpath = orig_defpath
1340
Guido van Rossume7ba4952007-06-06 23:52:48 +00001341class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001342 @unittest.skipIf(USING_LINUXTHREADS,
1343 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001344 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001345 self.assertRaises(OSError, os.execvpe, 'no such app-',
1346 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001347
Thomas Heller6790d602007-08-30 17:15:14 +00001348 def test_execvpe_with_bad_arglist(self):
1349 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1350
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001351 @unittest.skipUnless(hasattr(os, '_execvpe'),
1352 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001353 def _test_internal_execvpe(self, test_type):
1354 program_path = os.sep + 'absolutepath'
1355 if test_type is bytes:
1356 program = b'executable'
1357 fullpath = os.path.join(os.fsencode(program_path), program)
1358 native_fullpath = fullpath
1359 arguments = [b'progname', 'arg1', 'arg2']
1360 else:
1361 program = 'executable'
1362 arguments = ['progname', 'arg1', 'arg2']
1363 fullpath = os.path.join(program_path, program)
1364 if os.name != "nt":
1365 native_fullpath = os.fsencode(fullpath)
1366 else:
1367 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001368 env = {'spam': 'beans'}
1369
Victor Stinnerb745a742010-05-18 17:17:23 +00001370 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001371 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001372 self.assertRaises(RuntimeError,
1373 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001374 self.assertEqual(len(calls), 1)
1375 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1376
Victor Stinnerb745a742010-05-18 17:17:23 +00001377 # test os._execvpe() with a relative path:
1378 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001379 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001380 self.assertRaises(OSError,
1381 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001382 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001383 self.assertSequenceEqual(calls[0],
1384 ('execve', native_fullpath, (arguments, env)))
1385
1386 # test os._execvpe() with a relative path:
1387 # os.get_exec_path() reads the 'PATH' variable
1388 with _execvpe_mockup() as calls:
1389 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001390 if test_type is bytes:
1391 env_path[b'PATH'] = program_path
1392 else:
1393 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001394 self.assertRaises(OSError,
1395 os._execvpe, program, arguments, env=env_path)
1396 self.assertEqual(len(calls), 1)
1397 self.assertSequenceEqual(calls[0],
1398 ('execve', native_fullpath, (arguments, env_path)))
1399
1400 def test_internal_execvpe_str(self):
1401 self._test_internal_execvpe(str)
1402 if os.name != "nt":
1403 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001404
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001405
Serhiy Storchaka43767632013-11-03 21:31:38 +02001406@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001407class Win32ErrorTests(unittest.TestCase):
1408 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001409 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001410
1411 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001412 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001413
1414 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001415 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001416
1417 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001418 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001419 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001420 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001421 finally:
1422 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001423 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001424
1425 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001426 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001427
Thomas Wouters477c8d52006-05-27 19:21:47 +00001428 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001429 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001430
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001431class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001432 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001433 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1434 #singles.append("close")
1435 #We omit close because it doesn'r raise an exception on some platforms
1436 def get_single(f):
1437 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001438 if hasattr(os, f):
1439 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001440 return helper
1441 for f in singles:
1442 locals()["test_"+f] = get_single(f)
1443
Benjamin Peterson7522c742009-01-19 21:00:09 +00001444 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001445 try:
1446 f(support.make_bad_fd(), *args)
1447 except OSError as e:
1448 self.assertEqual(e.errno, errno.EBADF)
1449 else:
1450 self.fail("%r didn't raise a OSError with a bad file descriptor"
1451 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001452
Serhiy Storchaka43767632013-11-03 21:31:38 +02001453 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001454 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001455 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001456
Serhiy Storchaka43767632013-11-03 21:31:38 +02001457 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001458 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001459 fd = support.make_bad_fd()
1460 # Make sure none of the descriptors we are about to close are
1461 # currently valid (issue 6542).
1462 for i in range(10):
1463 try: os.fstat(fd+i)
1464 except OSError:
1465 pass
1466 else:
1467 break
1468 if i < 2:
1469 raise unittest.SkipTest(
1470 "Unable to acquire a range of invalid file descriptors")
1471 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001472
Serhiy Storchaka43767632013-11-03 21:31:38 +02001473 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001474 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001475 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001476
Serhiy Storchaka43767632013-11-03 21:31:38 +02001477 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001478 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001479 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001480
Serhiy Storchaka43767632013-11-03 21:31:38 +02001481 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001482 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001483 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001484
Serhiy Storchaka43767632013-11-03 21:31:38 +02001485 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001486 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001487 self.check(os.pathconf, "PC_NAME_MAX")
1488 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001489
Serhiy Storchaka43767632013-11-03 21:31:38 +02001490 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001491 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001492 self.check(os.truncate, 0)
1493 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001494
Serhiy Storchaka43767632013-11-03 21:31:38 +02001495 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001496 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001497 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001498
Serhiy Storchaka43767632013-11-03 21:31:38 +02001499 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001500 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001501 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001502
Victor Stinner57ddf782014-01-08 15:21:28 +01001503 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1504 def test_readv(self):
1505 buf = bytearray(10)
1506 self.check(os.readv, [buf])
1507
Serhiy Storchaka43767632013-11-03 21:31:38 +02001508 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001509 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001510 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001511
Serhiy Storchaka43767632013-11-03 21:31:38 +02001512 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001513 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001514 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001515
Victor Stinner57ddf782014-01-08 15:21:28 +01001516 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1517 def test_writev(self):
1518 self.check(os.writev, [b'abc'])
1519
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001520 def test_inheritable(self):
1521 self.check(os.get_inheritable)
1522 self.check(os.set_inheritable, True)
1523
1524 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1525 'needs os.get_blocking() and os.set_blocking()')
1526 def test_blocking(self):
1527 self.check(os.get_blocking)
1528 self.check(os.set_blocking, True)
1529
Brian Curtin1b9df392010-11-24 20:24:31 +00001530
1531class LinkTests(unittest.TestCase):
1532 def setUp(self):
1533 self.file1 = support.TESTFN
1534 self.file2 = os.path.join(support.TESTFN + "2")
1535
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001536 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001537 for file in (self.file1, self.file2):
1538 if os.path.exists(file):
1539 os.unlink(file)
1540
Brian Curtin1b9df392010-11-24 20:24:31 +00001541 def _test_link(self, file1, file2):
1542 with open(file1, "w") as f1:
1543 f1.write("test")
1544
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001545 with warnings.catch_warnings():
1546 warnings.simplefilter("ignore", DeprecationWarning)
1547 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001548 with open(file1, "r") as f1, open(file2, "r") as f2:
1549 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1550
1551 def test_link(self):
1552 self._test_link(self.file1, self.file2)
1553
1554 def test_link_bytes(self):
1555 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1556 bytes(self.file2, sys.getfilesystemencoding()))
1557
Brian Curtinf498b752010-11-30 15:54:04 +00001558 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001559 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001560 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001561 except UnicodeError:
1562 raise unittest.SkipTest("Unable to encode for this platform.")
1563
Brian Curtinf498b752010-11-30 15:54:04 +00001564 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001565 self.file2 = self.file1 + "2"
1566 self._test_link(self.file1, self.file2)
1567
Serhiy Storchaka43767632013-11-03 21:31:38 +02001568@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1569class PosixUidGidTests(unittest.TestCase):
1570 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1571 def test_setuid(self):
1572 if os.getuid() != 0:
1573 self.assertRaises(OSError, os.setuid, 0)
1574 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001575
Serhiy Storchaka43767632013-11-03 21:31:38 +02001576 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1577 def test_setgid(self):
1578 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1579 self.assertRaises(OSError, os.setgid, 0)
1580 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001581
Serhiy Storchaka43767632013-11-03 21:31:38 +02001582 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1583 def test_seteuid(self):
1584 if os.getuid() != 0:
1585 self.assertRaises(OSError, os.seteuid, 0)
1586 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001587
Serhiy Storchaka43767632013-11-03 21:31:38 +02001588 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1589 def test_setegid(self):
1590 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1591 self.assertRaises(OSError, os.setegid, 0)
1592 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001593
Serhiy Storchaka43767632013-11-03 21:31:38 +02001594 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1595 def test_setreuid(self):
1596 if os.getuid() != 0:
1597 self.assertRaises(OSError, os.setreuid, 0, 0)
1598 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1599 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001600
Serhiy Storchaka43767632013-11-03 21:31:38 +02001601 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1602 def test_setreuid_neg1(self):
1603 # Needs to accept -1. We run this in a subprocess to avoid
1604 # altering the test runner's process state (issue8045).
1605 subprocess.check_call([
1606 sys.executable, '-c',
1607 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001608
Serhiy Storchaka43767632013-11-03 21:31:38 +02001609 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1610 def test_setregid(self):
1611 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1612 self.assertRaises(OSError, os.setregid, 0, 0)
1613 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1614 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001615
Serhiy Storchaka43767632013-11-03 21:31:38 +02001616 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1617 def test_setregid_neg1(self):
1618 # Needs to accept -1. We run this in a subprocess to avoid
1619 # altering the test runner's process state (issue8045).
1620 subprocess.check_call([
1621 sys.executable, '-c',
1622 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001623
Serhiy Storchaka43767632013-11-03 21:31:38 +02001624@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1625class Pep383Tests(unittest.TestCase):
1626 def setUp(self):
1627 if support.TESTFN_UNENCODABLE:
1628 self.dir = support.TESTFN_UNENCODABLE
1629 elif support.TESTFN_NONASCII:
1630 self.dir = support.TESTFN_NONASCII
1631 else:
1632 self.dir = support.TESTFN
1633 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001634
Serhiy Storchaka43767632013-11-03 21:31:38 +02001635 bytesfn = []
1636 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001637 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001638 fn = os.fsencode(fn)
1639 except UnicodeEncodeError:
1640 return
1641 bytesfn.append(fn)
1642 add_filename(support.TESTFN_UNICODE)
1643 if support.TESTFN_UNENCODABLE:
1644 add_filename(support.TESTFN_UNENCODABLE)
1645 if support.TESTFN_NONASCII:
1646 add_filename(support.TESTFN_NONASCII)
1647 if not bytesfn:
1648 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001649
Serhiy Storchaka43767632013-11-03 21:31:38 +02001650 self.unicodefn = set()
1651 os.mkdir(self.dir)
1652 try:
1653 for fn in bytesfn:
1654 support.create_empty_file(os.path.join(self.bdir, fn))
1655 fn = os.fsdecode(fn)
1656 if fn in self.unicodefn:
1657 raise ValueError("duplicate filename")
1658 self.unicodefn.add(fn)
1659 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001660 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001661 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001662
Serhiy Storchaka43767632013-11-03 21:31:38 +02001663 def tearDown(self):
1664 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001665
Serhiy Storchaka43767632013-11-03 21:31:38 +02001666 def test_listdir(self):
1667 expected = self.unicodefn
1668 found = set(os.listdir(self.dir))
1669 self.assertEqual(found, expected)
1670 # test listdir without arguments
1671 current_directory = os.getcwd()
1672 try:
1673 os.chdir(os.sep)
1674 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1675 finally:
1676 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001677
Serhiy Storchaka43767632013-11-03 21:31:38 +02001678 def test_open(self):
1679 for fn in self.unicodefn:
1680 f = open(os.path.join(self.dir, fn), 'rb')
1681 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001682
Serhiy Storchaka43767632013-11-03 21:31:38 +02001683 @unittest.skipUnless(hasattr(os, 'statvfs'),
1684 "need os.statvfs()")
1685 def test_statvfs(self):
1686 # issue #9645
1687 for fn in self.unicodefn:
1688 # should not fail with file not found error
1689 fullname = os.path.join(self.dir, fn)
1690 os.statvfs(fullname)
1691
1692 def test_stat(self):
1693 for fn in self.unicodefn:
1694 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001695
Brian Curtineb24d742010-04-12 17:16:38 +00001696@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1697class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001698 def _kill(self, sig):
1699 # Start sys.executable as a subprocess and communicate from the
1700 # subprocess to the parent that the interpreter is ready. When it
1701 # becomes ready, send *sig* via os.kill to the subprocess and check
1702 # that the return code is equal to *sig*.
1703 import ctypes
1704 from ctypes import wintypes
1705 import msvcrt
1706
1707 # Since we can't access the contents of the process' stdout until the
1708 # process has exited, use PeekNamedPipe to see what's inside stdout
1709 # without waiting. This is done so we can tell that the interpreter
1710 # is started and running at a point where it could handle a signal.
1711 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1712 PeekNamedPipe.restype = wintypes.BOOL
1713 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1714 ctypes.POINTER(ctypes.c_char), # stdout buf
1715 wintypes.DWORD, # Buffer size
1716 ctypes.POINTER(wintypes.DWORD), # bytes read
1717 ctypes.POINTER(wintypes.DWORD), # bytes avail
1718 ctypes.POINTER(wintypes.DWORD)) # bytes left
1719 msg = "running"
1720 proc = subprocess.Popen([sys.executable, "-c",
1721 "import sys;"
1722 "sys.stdout.write('{}');"
1723 "sys.stdout.flush();"
1724 "input()".format(msg)],
1725 stdout=subprocess.PIPE,
1726 stderr=subprocess.PIPE,
1727 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001728 self.addCleanup(proc.stdout.close)
1729 self.addCleanup(proc.stderr.close)
1730 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001731
1732 count, max = 0, 100
1733 while count < max and proc.poll() is None:
1734 # Create a string buffer to store the result of stdout from the pipe
1735 buf = ctypes.create_string_buffer(len(msg))
1736 # Obtain the text currently in proc.stdout
1737 # Bytes read/avail/left are left as NULL and unused
1738 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1739 buf, ctypes.sizeof(buf), None, None, None)
1740 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1741 if buf.value:
1742 self.assertEqual(msg, buf.value.decode())
1743 break
1744 time.sleep(0.1)
1745 count += 1
1746 else:
1747 self.fail("Did not receive communication from the subprocess")
1748
Brian Curtineb24d742010-04-12 17:16:38 +00001749 os.kill(proc.pid, sig)
1750 self.assertEqual(proc.wait(), sig)
1751
1752 def test_kill_sigterm(self):
1753 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001754 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001755
1756 def test_kill_int(self):
1757 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001758 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001759
1760 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001761 tagname = "test_os_%s" % uuid.uuid1()
1762 m = mmap.mmap(-1, 1, tagname)
1763 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001764 # Run a script which has console control handling enabled.
1765 proc = subprocess.Popen([sys.executable,
1766 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001767 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001768 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1769 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001770 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001771 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001772 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001773 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001774 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001775 count += 1
1776 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001777 # Forcefully kill the process if we weren't able to signal it.
1778 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001779 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001780 os.kill(proc.pid, event)
1781 # proc.send_signal(event) could also be done here.
1782 # Allow time for the signal to be passed and the process to exit.
1783 time.sleep(0.5)
1784 if not proc.poll():
1785 # Forcefully kill the process if we weren't able to signal it.
1786 os.kill(proc.pid, signal.SIGINT)
1787 self.fail("subprocess did not stop on {}".format(name))
1788
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001789 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001790 def test_CTRL_C_EVENT(self):
1791 from ctypes import wintypes
1792 import ctypes
1793
1794 # Make a NULL value by creating a pointer with no argument.
1795 NULL = ctypes.POINTER(ctypes.c_int)()
1796 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1797 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1798 wintypes.BOOL)
1799 SetConsoleCtrlHandler.restype = wintypes.BOOL
1800
1801 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001802 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001803 # by subprocesses.
1804 SetConsoleCtrlHandler(NULL, 0)
1805
1806 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1807
1808 def test_CTRL_BREAK_EVENT(self):
1809 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1810
1811
Brian Curtind40e6f72010-07-08 21:39:08 +00001812@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001813class Win32ListdirTests(unittest.TestCase):
1814 """Test listdir on Windows."""
1815
1816 def setUp(self):
1817 self.created_paths = []
1818 for i in range(2):
1819 dir_name = 'SUB%d' % i
1820 dir_path = os.path.join(support.TESTFN, dir_name)
1821 file_name = 'FILE%d' % i
1822 file_path = os.path.join(support.TESTFN, file_name)
1823 os.makedirs(dir_path)
1824 with open(file_path, 'w') as f:
1825 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1826 self.created_paths.extend([dir_name, file_name])
1827 self.created_paths.sort()
1828
1829 def tearDown(self):
1830 shutil.rmtree(support.TESTFN)
1831
1832 def test_listdir_no_extended_path(self):
1833 """Test when the path is not an "extended" path."""
1834 # unicode
1835 self.assertEqual(
1836 sorted(os.listdir(support.TESTFN)),
1837 self.created_paths)
1838 # bytes
1839 self.assertEqual(
1840 sorted(os.listdir(os.fsencode(support.TESTFN))),
1841 [os.fsencode(path) for path in self.created_paths])
1842
1843 def test_listdir_extended_path(self):
1844 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001845 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001846 # unicode
1847 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1848 self.assertEqual(
1849 sorted(os.listdir(path)),
1850 self.created_paths)
1851 # bytes
1852 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1853 self.assertEqual(
1854 sorted(os.listdir(path)),
1855 [os.fsencode(path) for path in self.created_paths])
1856
1857
1858@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001859@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001860class Win32SymlinkTests(unittest.TestCase):
1861 filelink = 'filelinktest'
1862 filelink_target = os.path.abspath(__file__)
1863 dirlink = 'dirlinktest'
1864 dirlink_target = os.path.dirname(filelink_target)
1865 missing_link = 'missing link'
1866
1867 def setUp(self):
1868 assert os.path.exists(self.dirlink_target)
1869 assert os.path.exists(self.filelink_target)
1870 assert not os.path.exists(self.dirlink)
1871 assert not os.path.exists(self.filelink)
1872 assert not os.path.exists(self.missing_link)
1873
1874 def tearDown(self):
1875 if os.path.exists(self.filelink):
1876 os.remove(self.filelink)
1877 if os.path.exists(self.dirlink):
1878 os.rmdir(self.dirlink)
1879 if os.path.lexists(self.missing_link):
1880 os.remove(self.missing_link)
1881
1882 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001883 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001884 self.assertTrue(os.path.exists(self.dirlink))
1885 self.assertTrue(os.path.isdir(self.dirlink))
1886 self.assertTrue(os.path.islink(self.dirlink))
1887 self.check_stat(self.dirlink, self.dirlink_target)
1888
1889 def test_file_link(self):
1890 os.symlink(self.filelink_target, self.filelink)
1891 self.assertTrue(os.path.exists(self.filelink))
1892 self.assertTrue(os.path.isfile(self.filelink))
1893 self.assertTrue(os.path.islink(self.filelink))
1894 self.check_stat(self.filelink, self.filelink_target)
1895
1896 def _create_missing_dir_link(self):
1897 'Create a "directory" link to a non-existent target'
1898 linkname = self.missing_link
1899 if os.path.lexists(linkname):
1900 os.remove(linkname)
1901 target = r'c:\\target does not exist.29r3c740'
1902 assert not os.path.exists(target)
1903 target_is_dir = True
1904 os.symlink(target, linkname, target_is_dir)
1905
1906 def test_remove_directory_link_to_missing_target(self):
1907 self._create_missing_dir_link()
1908 # For compatibility with Unix, os.remove will check the
1909 # directory status and call RemoveDirectory if the symlink
1910 # was created with target_is_dir==True.
1911 os.remove(self.missing_link)
1912
1913 @unittest.skip("currently fails; consider for improvement")
1914 def test_isdir_on_directory_link_to_missing_target(self):
1915 self._create_missing_dir_link()
1916 # consider having isdir return true for directory links
1917 self.assertTrue(os.path.isdir(self.missing_link))
1918
1919 @unittest.skip("currently fails; consider for improvement")
1920 def test_rmdir_on_directory_link_to_missing_target(self):
1921 self._create_missing_dir_link()
1922 # consider allowing rmdir to remove directory links
1923 os.rmdir(self.missing_link)
1924
1925 def check_stat(self, link, target):
1926 self.assertEqual(os.stat(link), os.stat(target))
1927 self.assertNotEqual(os.lstat(link), os.stat(link))
1928
Brian Curtind25aef52011-06-13 15:16:04 -05001929 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001930 with warnings.catch_warnings():
1931 warnings.simplefilter("ignore", DeprecationWarning)
1932 self.assertEqual(os.stat(bytes_link), os.stat(target))
1933 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001934
1935 def test_12084(self):
1936 level1 = os.path.abspath(support.TESTFN)
1937 level2 = os.path.join(level1, "level2")
1938 level3 = os.path.join(level2, "level3")
1939 try:
1940 os.mkdir(level1)
1941 os.mkdir(level2)
1942 os.mkdir(level3)
1943
1944 file1 = os.path.abspath(os.path.join(level1, "file1"))
1945
1946 with open(file1, "w") as f:
1947 f.write("file1")
1948
1949 orig_dir = os.getcwd()
1950 try:
1951 os.chdir(level2)
1952 link = os.path.join(level2, "link")
1953 os.symlink(os.path.relpath(file1), "link")
1954 self.assertIn("link", os.listdir(os.getcwd()))
1955
1956 # Check os.stat calls from the same dir as the link
1957 self.assertEqual(os.stat(file1), os.stat("link"))
1958
1959 # Check os.stat calls from a dir below the link
1960 os.chdir(level1)
1961 self.assertEqual(os.stat(file1),
1962 os.stat(os.path.relpath(link)))
1963
1964 # Check os.stat calls from a dir above the link
1965 os.chdir(level3)
1966 self.assertEqual(os.stat(file1),
1967 os.stat(os.path.relpath(link)))
1968 finally:
1969 os.chdir(orig_dir)
1970 except OSError as err:
1971 self.fail(err)
1972 finally:
1973 os.remove(file1)
1974 shutil.rmtree(level1)
1975
Brian Curtind40e6f72010-07-08 21:39:08 +00001976
Tim Golden0321cf22014-05-05 19:46:17 +01001977@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1978class Win32JunctionTests(unittest.TestCase):
1979 junction = 'junctiontest'
1980 junction_target = os.path.dirname(os.path.abspath(__file__))
1981
1982 def setUp(self):
1983 assert os.path.exists(self.junction_target)
1984 assert not os.path.exists(self.junction)
1985
1986 def tearDown(self):
1987 if os.path.exists(self.junction):
1988 # os.rmdir delegates to Windows' RemoveDirectoryW,
1989 # which removes junction points safely.
1990 os.rmdir(self.junction)
1991
1992 def test_create_junction(self):
1993 _winapi.CreateJunction(self.junction_target, self.junction)
1994 self.assertTrue(os.path.exists(self.junction))
1995 self.assertTrue(os.path.isdir(self.junction))
1996
1997 # Junctions are not recognized as links.
1998 self.assertFalse(os.path.islink(self.junction))
1999
2000 def test_unlink_removes_junction(self):
2001 _winapi.CreateJunction(self.junction_target, self.junction)
2002 self.assertTrue(os.path.exists(self.junction))
2003
2004 os.unlink(self.junction)
2005 self.assertFalse(os.path.exists(self.junction))
2006
2007
Jason R. Coombs3a092862013-05-27 23:21:28 -04002008@support.skip_unless_symlink
2009class NonLocalSymlinkTests(unittest.TestCase):
2010
2011 def setUp(self):
2012 """
2013 Create this structure:
2014
2015 base
2016 \___ some_dir
2017 """
2018 os.makedirs('base/some_dir')
2019
2020 def tearDown(self):
2021 shutil.rmtree('base')
2022
2023 def test_directory_link_nonlocal(self):
2024 """
2025 The symlink target should resolve relative to the link, not relative
2026 to the current directory.
2027
2028 Then, link base/some_link -> base/some_dir and ensure that some_link
2029 is resolved as a directory.
2030
2031 In issue13772, it was discovered that directory detection failed if
2032 the symlink target was not specified relative to the current
2033 directory, which was a defect in the implementation.
2034 """
2035 src = os.path.join('base', 'some_link')
2036 os.symlink('some_dir', src)
2037 assert os.path.isdir(src)
2038
2039
Victor Stinnere8d51452010-08-19 01:05:19 +00002040class FSEncodingTests(unittest.TestCase):
2041 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002042 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2043 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002044
Victor Stinnere8d51452010-08-19 01:05:19 +00002045 def test_identity(self):
2046 # assert fsdecode(fsencode(x)) == x
2047 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2048 try:
2049 bytesfn = os.fsencode(fn)
2050 except UnicodeEncodeError:
2051 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002052 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002053
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002054
Brett Cannonefb00c02012-02-29 18:31:31 -05002055
2056class DeviceEncodingTests(unittest.TestCase):
2057
2058 def test_bad_fd(self):
2059 # Return None when an fd doesn't actually exist.
2060 self.assertIsNone(os.device_encoding(123456))
2061
Philip Jenveye308b7c2012-02-29 16:16:15 -08002062 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2063 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002064 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002065 def test_device_encoding(self):
2066 encoding = os.device_encoding(0)
2067 self.assertIsNotNone(encoding)
2068 self.assertTrue(codecs.lookup(encoding))
2069
2070
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002071class PidTests(unittest.TestCase):
2072 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2073 def test_getppid(self):
2074 p = subprocess.Popen([sys.executable, '-c',
2075 'import os; print(os.getppid())'],
2076 stdout=subprocess.PIPE)
2077 stdout, _ = p.communicate()
2078 # We are the parent of our subprocess
2079 self.assertEqual(int(stdout), os.getpid())
2080
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002081 def test_waitpid(self):
2082 args = [sys.executable, '-c', 'pass']
2083 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2084 status = os.waitpid(pid, 0)
2085 self.assertEqual(status, (pid, 0))
2086
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002087
Brian Curtin0151b8e2010-09-24 13:43:43 +00002088# The introduction of this TestCase caused at least two different errors on
2089# *nix buildbots. Temporarily skip this to let the buildbots move along.
2090@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002091@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2092class LoginTests(unittest.TestCase):
2093 def test_getlogin(self):
2094 user_name = os.getlogin()
2095 self.assertNotEqual(len(user_name), 0)
2096
2097
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002098@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2099 "needs os.getpriority and os.setpriority")
2100class ProgramPriorityTests(unittest.TestCase):
2101 """Tests for os.getpriority() and os.setpriority()."""
2102
2103 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002104
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002105 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2106 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2107 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002108 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2109 if base >= 19 and new_prio <= 19:
2110 raise unittest.SkipTest(
2111 "unable to reliably test setpriority at current nice level of %s" % base)
2112 else:
2113 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002114 finally:
2115 try:
2116 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2117 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002118 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002119 raise
2120
2121
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002122if threading is not None:
2123 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002124
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002125 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002126
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002127 def __init__(self, conn):
2128 asynchat.async_chat.__init__(self, conn)
2129 self.in_buffer = []
2130 self.closed = False
2131 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002132
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002133 def handle_read(self):
2134 data = self.recv(4096)
2135 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002136
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002137 def get_data(self):
2138 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002139
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002140 def handle_close(self):
2141 self.close()
2142 self.closed = True
2143
2144 def handle_error(self):
2145 raise
2146
2147 def __init__(self, address):
2148 threading.Thread.__init__(self)
2149 asyncore.dispatcher.__init__(self)
2150 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2151 self.bind(address)
2152 self.listen(5)
2153 self.host, self.port = self.socket.getsockname()[:2]
2154 self.handler_instance = None
2155 self._active = False
2156 self._active_lock = threading.Lock()
2157
2158 # --- public API
2159
2160 @property
2161 def running(self):
2162 return self._active
2163
2164 def start(self):
2165 assert not self.running
2166 self.__flag = threading.Event()
2167 threading.Thread.start(self)
2168 self.__flag.wait()
2169
2170 def stop(self):
2171 assert self.running
2172 self._active = False
2173 self.join()
2174
2175 def wait(self):
2176 # wait for handler connection to be closed, then stop the server
2177 while not getattr(self.handler_instance, "closed", False):
2178 time.sleep(0.001)
2179 self.stop()
2180
2181 # --- internals
2182
2183 def run(self):
2184 self._active = True
2185 self.__flag.set()
2186 while self._active and asyncore.socket_map:
2187 self._active_lock.acquire()
2188 asyncore.loop(timeout=0.001, count=1)
2189 self._active_lock.release()
2190 asyncore.close_all()
2191
2192 def handle_accept(self):
2193 conn, addr = self.accept()
2194 self.handler_instance = self.Handler(conn)
2195
2196 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002197 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002198 handle_read = handle_connect
2199
2200 def writable(self):
2201 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002202
2203 def handle_error(self):
2204 raise
2205
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002206
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002207@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002208@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2209class TestSendfile(unittest.TestCase):
2210
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002211 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002212 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002213 not sys.platform.startswith("solaris") and \
2214 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002215 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2216 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002217
2218 @classmethod
2219 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002220 cls.key = support.threading_setup()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002221 with open(support.TESTFN, "wb") as f:
2222 f.write(cls.DATA)
2223
2224 @classmethod
2225 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002226 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002227 support.unlink(support.TESTFN)
2228
2229 def setUp(self):
2230 self.server = SendfileTestServer((support.HOST, 0))
2231 self.server.start()
2232 self.client = socket.socket()
2233 self.client.connect((self.server.host, self.server.port))
2234 self.client.settimeout(1)
2235 # synchronize by waiting for "220 ready" response
2236 self.client.recv(1024)
2237 self.sockno = self.client.fileno()
2238 self.file = open(support.TESTFN, 'rb')
2239 self.fileno = self.file.fileno()
2240
2241 def tearDown(self):
2242 self.file.close()
2243 self.client.close()
2244 if self.server.running:
2245 self.server.stop()
2246
2247 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2248 """A higher level wrapper representing how an application is
2249 supposed to use sendfile().
2250 """
2251 while 1:
2252 try:
2253 if self.SUPPORT_HEADERS_TRAILERS:
2254 return os.sendfile(sock, file, offset, nbytes, headers,
2255 trailers)
2256 else:
2257 return os.sendfile(sock, file, offset, nbytes)
2258 except OSError as err:
2259 if err.errno == errno.ECONNRESET:
2260 # disconnected
2261 raise
2262 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2263 # we have to retry send data
2264 continue
2265 else:
2266 raise
2267
2268 def test_send_whole_file(self):
2269 # normal send
2270 total_sent = 0
2271 offset = 0
2272 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002273 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002274 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2275 if sent == 0:
2276 break
2277 offset += sent
2278 total_sent += sent
2279 self.assertTrue(sent <= nbytes)
2280 self.assertEqual(offset, total_sent)
2281
2282 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002283 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002284 self.client.close()
2285 self.server.wait()
2286 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002287 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002288 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002289
2290 def test_send_at_certain_offset(self):
2291 # start sending a file at a certain offset
2292 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002293 offset = len(self.DATA) // 2
2294 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002295 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002296 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002297 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2298 if sent == 0:
2299 break
2300 offset += sent
2301 total_sent += sent
2302 self.assertTrue(sent <= nbytes)
2303
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002304 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002305 self.client.close()
2306 self.server.wait()
2307 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002308 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002309 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002310 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002311 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002312
2313 def test_offset_overflow(self):
2314 # specify an offset > file size
2315 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002316 try:
2317 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2318 except OSError as e:
2319 # Solaris can raise EINVAL if offset >= file length, ignore.
2320 if e.errno != errno.EINVAL:
2321 raise
2322 else:
2323 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002324 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002325 self.client.close()
2326 self.server.wait()
2327 data = self.server.handler_instance.get_data()
2328 self.assertEqual(data, b'')
2329
2330 def test_invalid_offset(self):
2331 with self.assertRaises(OSError) as cm:
2332 os.sendfile(self.sockno, self.fileno, -1, 4096)
2333 self.assertEqual(cm.exception.errno, errno.EINVAL)
2334
Martin Panterbf19d162015-09-09 01:01:13 +00002335 def test_keywords(self):
2336 # Keyword arguments should be supported
2337 os.sendfile(out=self.sockno, offset=0, count=4096,
2338 **{'in': self.fileno})
2339 if self.SUPPORT_HEADERS_TRAILERS:
2340 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002341 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002342
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002343 # --- headers / trailers tests
2344
Serhiy Storchaka43767632013-11-03 21:31:38 +02002345 @requires_headers_trailers
2346 def test_headers(self):
2347 total_sent = 0
2348 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2349 headers=[b"x" * 512])
2350 total_sent += sent
2351 offset = 4096
2352 nbytes = 4096
2353 while 1:
2354 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2355 offset, nbytes)
2356 if sent == 0:
2357 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002358 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002359 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002360
Serhiy Storchaka43767632013-11-03 21:31:38 +02002361 expected_data = b"x" * 512 + self.DATA
2362 self.assertEqual(total_sent, len(expected_data))
2363 self.client.close()
2364 self.server.wait()
2365 data = self.server.handler_instance.get_data()
2366 self.assertEqual(hash(data), hash(expected_data))
2367
2368 @requires_headers_trailers
2369 def test_trailers(self):
2370 TESTFN2 = support.TESTFN + "2"
2371 file_data = b"abcdef"
2372 with open(TESTFN2, 'wb') as f:
2373 f.write(file_data)
2374 with open(TESTFN2, 'rb')as f:
2375 self.addCleanup(os.remove, TESTFN2)
2376 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2377 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002378 self.client.close()
2379 self.server.wait()
2380 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002381 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002382
Serhiy Storchaka43767632013-11-03 21:31:38 +02002383 @requires_headers_trailers
2384 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2385 'test needs os.SF_NODISKIO')
2386 def test_flags(self):
2387 try:
2388 os.sendfile(self.sockno, self.fileno, 0, 4096,
2389 flags=os.SF_NODISKIO)
2390 except OSError as err:
2391 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2392 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002393
2394
Larry Hastings9cf065c2012-06-22 16:30:09 -07002395def supports_extended_attributes():
2396 if not hasattr(os, "setxattr"):
2397 return False
2398 try:
2399 with open(support.TESTFN, "wb") as fp:
2400 try:
2401 os.setxattr(fp.fileno(), b"user.test", b"")
2402 except OSError:
2403 return False
2404 finally:
2405 support.unlink(support.TESTFN)
2406 # Kernels < 2.6.39 don't respect setxattr flags.
2407 kernel_version = platform.release()
2408 m = re.match("2.6.(\d{1,2})", kernel_version)
2409 return m is None or int(m.group(1)) >= 39
2410
2411
2412@unittest.skipUnless(supports_extended_attributes(),
2413 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002414class ExtendedAttributeTests(unittest.TestCase):
2415
2416 def tearDown(self):
2417 support.unlink(support.TESTFN)
2418
Larry Hastings9cf065c2012-06-22 16:30:09 -07002419 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002420 fn = support.TESTFN
2421 open(fn, "wb").close()
2422 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002423 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002424 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002425 init_xattr = listxattr(fn)
2426 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002427 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002428 xattr = set(init_xattr)
2429 xattr.add("user.test")
2430 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002431 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2432 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2433 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002434 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002435 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002436 self.assertEqual(cm.exception.errno, errno.EEXIST)
2437 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002438 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002439 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002440 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002441 xattr.add("user.test2")
2442 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002443 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002444 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002445 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002446 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002447 xattr.remove("user.test")
2448 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002449 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2450 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2451 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2452 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002453 many = sorted("user.test{}".format(i) for i in range(100))
2454 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002455 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002456 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002457
Larry Hastings9cf065c2012-06-22 16:30:09 -07002458 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002459 def make_bytes(s):
2460 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002461 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002462 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002463 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002464
2465 def test_simple(self):
2466 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2467 os.listxattr)
2468
2469 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002470 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2471 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002472
2473 def test_fds(self):
2474 def getxattr(path, *args):
2475 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002476 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002477 def setxattr(path, *args):
2478 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002479 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002480 def removexattr(path, *args):
2481 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002482 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002483 def listxattr(path, *args):
2484 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002485 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002486 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2487
2488
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002489@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2490class Win32DeprecatedBytesAPI(unittest.TestCase):
2491 def test_deprecated(self):
2492 import nt
2493 filename = os.fsencode(support.TESTFN)
2494 with warnings.catch_warnings():
2495 warnings.simplefilter("error", DeprecationWarning)
2496 for func, *args in (
2497 (nt._getfullpathname, filename),
2498 (nt._isdir, filename),
2499 (os.access, filename, os.R_OK),
2500 (os.chdir, filename),
2501 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002502 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002503 (os.link, filename, filename),
2504 (os.listdir, filename),
2505 (os.lstat, filename),
2506 (os.mkdir, filename),
2507 (os.open, filename, os.O_RDONLY),
2508 (os.rename, filename, filename),
2509 (os.rmdir, filename),
2510 (os.startfile, filename),
2511 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002512 (os.unlink, filename),
2513 (os.utime, filename),
2514 ):
2515 self.assertRaises(DeprecationWarning, func, *args)
2516
Victor Stinner28216442011-11-16 00:34:44 +01002517 @support.skip_unless_symlink
2518 def test_symlink(self):
2519 filename = os.fsencode(support.TESTFN)
2520 with warnings.catch_warnings():
2521 warnings.simplefilter("error", DeprecationWarning)
2522 self.assertRaises(DeprecationWarning,
2523 os.symlink, filename, filename)
2524
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002525
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002526@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2527class TermsizeTests(unittest.TestCase):
2528 def test_does_not_crash(self):
2529 """Check if get_terminal_size() returns a meaningful value.
2530
2531 There's no easy portable way to actually check the size of the
2532 terminal, so let's check if it returns something sensible instead.
2533 """
2534 try:
2535 size = os.get_terminal_size()
2536 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002537 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002538 # Under win32 a generic OSError can be thrown if the
2539 # handle cannot be retrieved
2540 self.skipTest("failed to query terminal size")
2541 raise
2542
Antoine Pitroucfade362012-02-08 23:48:59 +01002543 self.assertGreaterEqual(size.columns, 0)
2544 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002545
2546 def test_stty_match(self):
2547 """Check if stty returns the same results
2548
2549 stty actually tests stdin, so get_terminal_size is invoked on
2550 stdin explicitly. If stty succeeded, then get_terminal_size()
2551 should work too.
2552 """
2553 try:
2554 size = subprocess.check_output(['stty', 'size']).decode().split()
2555 except (FileNotFoundError, subprocess.CalledProcessError):
2556 self.skipTest("stty invocation failed")
2557 expected = (int(size[1]), int(size[0])) # reversed order
2558
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002559 try:
2560 actual = os.get_terminal_size(sys.__stdin__.fileno())
2561 except OSError as e:
2562 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2563 # Under win32 a generic OSError can be thrown if the
2564 # handle cannot be retrieved
2565 self.skipTest("failed to query terminal size")
2566 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002567 self.assertEqual(expected, actual)
2568
2569
Victor Stinner292c8352012-10-30 02:17:38 +01002570class OSErrorTests(unittest.TestCase):
2571 def setUp(self):
2572 class Str(str):
2573 pass
2574
Victor Stinnerafe17062012-10-31 22:47:43 +01002575 self.bytes_filenames = []
2576 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002577 if support.TESTFN_UNENCODABLE is not None:
2578 decoded = support.TESTFN_UNENCODABLE
2579 else:
2580 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002581 self.unicode_filenames.append(decoded)
2582 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002583 if support.TESTFN_UNDECODABLE is not None:
2584 encoded = support.TESTFN_UNDECODABLE
2585 else:
2586 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002587 self.bytes_filenames.append(encoded)
2588 self.bytes_filenames.append(memoryview(encoded))
2589
2590 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002591
2592 def test_oserror_filename(self):
2593 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002594 (self.filenames, os.chdir,),
2595 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002596 (self.filenames, os.lstat,),
2597 (self.filenames, os.open, os.O_RDONLY),
2598 (self.filenames, os.rmdir,),
2599 (self.filenames, os.stat,),
2600 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002601 ]
2602 if sys.platform == "win32":
2603 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002604 (self.bytes_filenames, os.rename, b"dst"),
2605 (self.bytes_filenames, os.replace, b"dst"),
2606 (self.unicode_filenames, os.rename, "dst"),
2607 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002608 # Issue #16414: Don't test undecodable names with listdir()
2609 # because of a Windows bug.
2610 #
2611 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2612 # empty list (instead of failing), whereas os.listdir(b'\xff')
2613 # raises a FileNotFoundError. It looks like a Windows bug:
2614 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2615 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2616 # ERROR_PATH_NOT_FOUND (3).
2617 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002618 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002619 else:
2620 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002621 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002622 (self.filenames, os.rename, "dst"),
2623 (self.filenames, os.replace, "dst"),
2624 ))
2625 if hasattr(os, "chown"):
2626 funcs.append((self.filenames, os.chown, 0, 0))
2627 if hasattr(os, "lchown"):
2628 funcs.append((self.filenames, os.lchown, 0, 0))
2629 if hasattr(os, "truncate"):
2630 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002631 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002632 funcs.append((self.filenames, os.chflags, 0))
2633 if hasattr(os, "lchflags"):
2634 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002635 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002636 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002637 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002638 if sys.platform == "win32":
2639 funcs.append((self.bytes_filenames, os.link, b"dst"))
2640 funcs.append((self.unicode_filenames, os.link, "dst"))
2641 else:
2642 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002643 if hasattr(os, "listxattr"):
2644 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002645 (self.filenames, os.listxattr,),
2646 (self.filenames, os.getxattr, "user.test"),
2647 (self.filenames, os.setxattr, "user.test", b'user'),
2648 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002649 ))
2650 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002651 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002652 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002653 if sys.platform == "win32":
2654 funcs.append((self.unicode_filenames, os.readlink,))
2655 else:
2656 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002657
Victor Stinnerafe17062012-10-31 22:47:43 +01002658 for filenames, func, *func_args in funcs:
2659 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002660 try:
2661 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002662 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002663 self.assertIs(err.filename, name)
2664 else:
2665 self.fail("No exception thrown by {}".format(func))
2666
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002667class CPUCountTests(unittest.TestCase):
2668 def test_cpu_count(self):
2669 cpus = os.cpu_count()
2670 if cpus is not None:
2671 self.assertIsInstance(cpus, int)
2672 self.assertGreater(cpus, 0)
2673 else:
2674 self.skipTest("Could not determine the number of CPUs")
2675
Victor Stinnerdaf45552013-08-28 00:53:59 +02002676
2677class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002678 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002679 fd = os.open(__file__, os.O_RDONLY)
2680 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002681 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002682
Victor Stinnerdaf45552013-08-28 00:53:59 +02002683 os.set_inheritable(fd, True)
2684 self.assertEqual(os.get_inheritable(fd), True)
2685
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002686 @unittest.skipIf(fcntl is None, "need fcntl")
2687 def test_get_inheritable_cloexec(self):
2688 fd = os.open(__file__, os.O_RDONLY)
2689 self.addCleanup(os.close, fd)
2690 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002691
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002692 # clear FD_CLOEXEC flag
2693 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2694 flags &= ~fcntl.FD_CLOEXEC
2695 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002696
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002697 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002698
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002699 @unittest.skipIf(fcntl is None, "need fcntl")
2700 def test_set_inheritable_cloexec(self):
2701 fd = os.open(__file__, os.O_RDONLY)
2702 self.addCleanup(os.close, fd)
2703 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2704 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002705
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002706 os.set_inheritable(fd, True)
2707 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2708 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002709
Victor Stinnerdaf45552013-08-28 00:53:59 +02002710 def test_open(self):
2711 fd = os.open(__file__, os.O_RDONLY)
2712 self.addCleanup(os.close, fd)
2713 self.assertEqual(os.get_inheritable(fd), False)
2714
2715 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2716 def test_pipe(self):
2717 rfd, wfd = os.pipe()
2718 self.addCleanup(os.close, rfd)
2719 self.addCleanup(os.close, wfd)
2720 self.assertEqual(os.get_inheritable(rfd), False)
2721 self.assertEqual(os.get_inheritable(wfd), False)
2722
2723 def test_dup(self):
2724 fd1 = os.open(__file__, os.O_RDONLY)
2725 self.addCleanup(os.close, fd1)
2726
2727 fd2 = os.dup(fd1)
2728 self.addCleanup(os.close, fd2)
2729 self.assertEqual(os.get_inheritable(fd2), False)
2730
2731 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2732 def test_dup2(self):
2733 fd = os.open(__file__, os.O_RDONLY)
2734 self.addCleanup(os.close, fd)
2735
2736 # inheritable by default
2737 fd2 = os.open(__file__, os.O_RDONLY)
2738 try:
2739 os.dup2(fd, fd2)
2740 self.assertEqual(os.get_inheritable(fd2), True)
2741 finally:
2742 os.close(fd2)
2743
2744 # force non-inheritable
2745 fd3 = os.open(__file__, os.O_RDONLY)
2746 try:
2747 os.dup2(fd, fd3, inheritable=False)
2748 self.assertEqual(os.get_inheritable(fd3), False)
2749 finally:
2750 os.close(fd3)
2751
2752 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2753 def test_openpty(self):
2754 master_fd, slave_fd = os.openpty()
2755 self.addCleanup(os.close, master_fd)
2756 self.addCleanup(os.close, slave_fd)
2757 self.assertEqual(os.get_inheritable(master_fd), False)
2758 self.assertEqual(os.get_inheritable(slave_fd), False)
2759
2760
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002761@unittest.skipUnless(hasattr(os, 'get_blocking'),
2762 'needs os.get_blocking() and os.set_blocking()')
2763class BlockingTests(unittest.TestCase):
2764 def test_blocking(self):
2765 fd = os.open(__file__, os.O_RDONLY)
2766 self.addCleanup(os.close, fd)
2767 self.assertEqual(os.get_blocking(fd), True)
2768
2769 os.set_blocking(fd, False)
2770 self.assertEqual(os.get_blocking(fd), False)
2771
2772 os.set_blocking(fd, True)
2773 self.assertEqual(os.get_blocking(fd), True)
2774
2775
Yury Selivanov97e2e062014-09-26 12:33:06 -04002776
2777class ExportsTests(unittest.TestCase):
2778 def test_os_all(self):
2779 self.assertIn('open', os.__all__)
2780 self.assertIn('walk', os.__all__)
2781
2782
Victor Stinner6036e442015-03-08 01:58:04 +01002783class TestScandir(unittest.TestCase):
2784 def setUp(self):
2785 self.path = os.path.realpath(support.TESTFN)
2786 self.addCleanup(support.rmtree, self.path)
2787 os.mkdir(self.path)
2788
2789 def create_file(self, name="file.txt"):
2790 filename = os.path.join(self.path, name)
2791 with open(filename, "wb") as fp:
2792 fp.write(b'python')
2793 return filename
2794
2795 def get_entries(self, names):
2796 entries = dict((entry.name, entry)
2797 for entry in os.scandir(self.path))
2798 self.assertEqual(sorted(entries.keys()), names)
2799 return entries
2800
2801 def assert_stat_equal(self, stat1, stat2, skip_fields):
2802 if skip_fields:
2803 for attr in dir(stat1):
2804 if not attr.startswith("st_"):
2805 continue
2806 if attr in ("st_dev", "st_ino", "st_nlink"):
2807 continue
2808 self.assertEqual(getattr(stat1, attr),
2809 getattr(stat2, attr),
2810 (stat1, stat2, attr))
2811 else:
2812 self.assertEqual(stat1, stat2)
2813
2814 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2815 self.assertEqual(entry.name, name)
2816 self.assertEqual(entry.path, os.path.join(self.path, name))
2817 self.assertEqual(entry.inode(),
2818 os.stat(entry.path, follow_symlinks=False).st_ino)
2819
2820 entry_stat = os.stat(entry.path)
2821 self.assertEqual(entry.is_dir(),
2822 stat.S_ISDIR(entry_stat.st_mode))
2823 self.assertEqual(entry.is_file(),
2824 stat.S_ISREG(entry_stat.st_mode))
2825 self.assertEqual(entry.is_symlink(),
2826 os.path.islink(entry.path))
2827
2828 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2829 self.assertEqual(entry.is_dir(follow_symlinks=False),
2830 stat.S_ISDIR(entry_lstat.st_mode))
2831 self.assertEqual(entry.is_file(follow_symlinks=False),
2832 stat.S_ISREG(entry_lstat.st_mode))
2833
2834 self.assert_stat_equal(entry.stat(),
2835 entry_stat,
2836 os.name == 'nt' and not is_symlink)
2837 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2838 entry_lstat,
2839 os.name == 'nt')
2840
2841 def test_attributes(self):
2842 link = hasattr(os, 'link')
2843 symlink = support.can_symlink()
2844
2845 dirname = os.path.join(self.path, "dir")
2846 os.mkdir(dirname)
2847 filename = self.create_file("file.txt")
2848 if link:
2849 os.link(filename, os.path.join(self.path, "link_file.txt"))
2850 if symlink:
2851 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2852 target_is_directory=True)
2853 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2854
2855 names = ['dir', 'file.txt']
2856 if link:
2857 names.append('link_file.txt')
2858 if symlink:
2859 names.extend(('symlink_dir', 'symlink_file.txt'))
2860 entries = self.get_entries(names)
2861
2862 entry = entries['dir']
2863 self.check_entry(entry, 'dir', True, False, False)
2864
2865 entry = entries['file.txt']
2866 self.check_entry(entry, 'file.txt', False, True, False)
2867
2868 if link:
2869 entry = entries['link_file.txt']
2870 self.check_entry(entry, 'link_file.txt', False, True, False)
2871
2872 if symlink:
2873 entry = entries['symlink_dir']
2874 self.check_entry(entry, 'symlink_dir', True, False, True)
2875
2876 entry = entries['symlink_file.txt']
2877 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2878
2879 def get_entry(self, name):
2880 entries = list(os.scandir(self.path))
2881 self.assertEqual(len(entries), 1)
2882
2883 entry = entries[0]
2884 self.assertEqual(entry.name, name)
2885 return entry
2886
2887 def create_file_entry(self):
2888 filename = self.create_file()
2889 return self.get_entry(os.path.basename(filename))
2890
2891 def test_current_directory(self):
2892 filename = self.create_file()
2893 old_dir = os.getcwd()
2894 try:
2895 os.chdir(self.path)
2896
2897 # call scandir() without parameter: it must list the content
2898 # of the current directory
2899 entries = dict((entry.name, entry) for entry in os.scandir())
2900 self.assertEqual(sorted(entries.keys()),
2901 [os.path.basename(filename)])
2902 finally:
2903 os.chdir(old_dir)
2904
2905 def test_repr(self):
2906 entry = self.create_file_entry()
2907 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2908
2909 def test_removed_dir(self):
2910 path = os.path.join(self.path, 'dir')
2911
2912 os.mkdir(path)
2913 entry = self.get_entry('dir')
2914 os.rmdir(path)
2915
2916 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2917 if os.name == 'nt':
2918 self.assertTrue(entry.is_dir())
2919 self.assertFalse(entry.is_file())
2920 self.assertFalse(entry.is_symlink())
2921 if os.name == 'nt':
2922 self.assertRaises(FileNotFoundError, entry.inode)
2923 # don't fail
2924 entry.stat()
2925 entry.stat(follow_symlinks=False)
2926 else:
2927 self.assertGreater(entry.inode(), 0)
2928 self.assertRaises(FileNotFoundError, entry.stat)
2929 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2930
2931 def test_removed_file(self):
2932 entry = self.create_file_entry()
2933 os.unlink(entry.path)
2934
2935 self.assertFalse(entry.is_dir())
2936 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2937 if os.name == 'nt':
2938 self.assertTrue(entry.is_file())
2939 self.assertFalse(entry.is_symlink())
2940 if os.name == 'nt':
2941 self.assertRaises(FileNotFoundError, entry.inode)
2942 # don't fail
2943 entry.stat()
2944 entry.stat(follow_symlinks=False)
2945 else:
2946 self.assertGreater(entry.inode(), 0)
2947 self.assertRaises(FileNotFoundError, entry.stat)
2948 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2949
2950 def test_broken_symlink(self):
2951 if not support.can_symlink():
2952 return self.skipTest('cannot create symbolic link')
2953
2954 filename = self.create_file("file.txt")
2955 os.symlink(filename,
2956 os.path.join(self.path, "symlink.txt"))
2957 entries = self.get_entries(['file.txt', 'symlink.txt'])
2958 entry = entries['symlink.txt']
2959 os.unlink(filename)
2960
2961 self.assertGreater(entry.inode(), 0)
2962 self.assertFalse(entry.is_dir())
2963 self.assertFalse(entry.is_file()) # broken symlink returns False
2964 self.assertFalse(entry.is_dir(follow_symlinks=False))
2965 self.assertFalse(entry.is_file(follow_symlinks=False))
2966 self.assertTrue(entry.is_symlink())
2967 self.assertRaises(FileNotFoundError, entry.stat)
2968 # don't fail
2969 entry.stat(follow_symlinks=False)
2970
2971 def test_bytes(self):
2972 if os.name == "nt":
2973 # On Windows, os.scandir(bytes) must raise an exception
2974 self.assertRaises(TypeError, os.scandir, b'.')
2975 return
2976
2977 self.create_file("file.txt")
2978
2979 path_bytes = os.fsencode(self.path)
2980 entries = list(os.scandir(path_bytes))
2981 self.assertEqual(len(entries), 1, entries)
2982 entry = entries[0]
2983
2984 self.assertEqual(entry.name, b'file.txt')
2985 self.assertEqual(entry.path,
2986 os.fsencode(os.path.join(self.path, 'file.txt')))
2987
2988 def test_empty_path(self):
2989 self.assertRaises(FileNotFoundError, os.scandir, '')
2990
2991 def test_consume_iterator_twice(self):
2992 self.create_file("file.txt")
2993 iterator = os.scandir(self.path)
2994
2995 entries = list(iterator)
2996 self.assertEqual(len(entries), 1, entries)
2997
2998 # check than consuming the iterator twice doesn't raise exception
2999 entries2 = list(iterator)
3000 self.assertEqual(len(entries2), 0, entries2)
3001
3002 def test_bad_path_type(self):
3003 for obj in [1234, 1.234, {}, []]:
3004 self.assertRaises(TypeError, os.scandir, obj)
3005
3006
Fred Drake2e2be372001-09-20 21:33:42 +00003007if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003008 unittest.main()