blob: b3cf2072c33a3d5dc9d41e5d9dd2dd1df2014124 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import platform
19import re
Victor Stinner47aacc82015-06-12 17:26:23 +020020import shutil
21import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000022import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020024import subprocess
25import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010026import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020027import time
28import unittest
29import uuid
30import warnings
31from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000032try:
33 import threading
34except ImportError:
35 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020036try:
37 import resource
38except ImportError:
39 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020040try:
41 import fcntl
42except ImportError:
43 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010044try:
45 import _winapi
46except ImportError:
47 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020048try:
R David Murrayf2ad1732014-12-25 18:36:56 -050049 import grp
50 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
51 if hasattr(os, 'getgid'):
52 process_gid = os.getgid()
53 if process_gid not in groups:
54 groups.append(process_gid)
55except ImportError:
56 groups = []
57try:
58 import pwd
59 all_users = [u.pw_uid for u in pwd.getpwall()]
60except ImportError:
61 all_users = []
62try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020064except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020065 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020066
Berker Peksagce643912015-05-06 06:33:17 +030067from test.support.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Thomas Wouters0e3f5912006-08-11 14:57:12 +000085# Tests creating TESTFN
86class FileTests(unittest.TestCase):
87 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000088 if os.path.exists(support.TESTFN):
89 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000090 tearDown = setUp
91
92 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000093 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000095 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000096
Christian Heimesfdab48e2008-01-20 09:06:41 +000097 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000098 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
99 # We must allocate two consecutive file descriptors, otherwise
100 # it will mess up other file descriptors (perhaps even the three
101 # standard ones).
102 second = os.dup(first)
103 try:
104 retries = 0
105 while second != first + 1:
106 os.close(first)
107 retries += 1
108 if retries > 10:
109 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000110 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000111 first, second = second, os.dup(second)
112 finally:
113 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000114 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000115 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000116 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000118 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000119 def test_rename(self):
120 path = support.TESTFN
121 old = sys.getrefcount(path)
122 self.assertRaises(TypeError, os.rename, path, 0)
123 new = sys.getrefcount(path)
124 self.assertEqual(old, new)
125
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000126 def test_read(self):
127 with open(support.TESTFN, "w+b") as fobj:
128 fobj.write(b"spam")
129 fobj.flush()
130 fd = fobj.fileno()
131 os.lseek(fd, 0, 0)
132 s = os.read(fd, 4)
133 self.assertEqual(type(s), bytes)
134 self.assertEqual(s, b"spam")
135
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200136 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200137 # Skip the test on 32-bit platforms: the number of bytes must fit in a
138 # Py_ssize_t type
139 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
140 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200141 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
142 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200143 with open(support.TESTFN, "wb") as fp:
144 fp.write(b'test')
145 self.addCleanup(support.unlink, support.TESTFN)
146
147 # Issue #21932: Make sure that os.read() does not raise an
148 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200149 with open(support.TESTFN, "rb") as fp:
150 data = os.read(fp.fileno(), size)
151
152 # The test does not try to read more than 2 GB at once because the
153 # operating system is free to return less bytes than requested.
154 self.assertEqual(data, b'test')
155
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000156 def test_write(self):
157 # os.write() accepts bytes- and buffer-like objects but not strings
158 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
159 self.assertRaises(TypeError, os.write, fd, "beans")
160 os.write(fd, b"bacon\n")
161 os.write(fd, bytearray(b"eggs\n"))
162 os.write(fd, memoryview(b"spam\n"))
163 os.close(fd)
164 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000165 self.assertEqual(fobj.read().splitlines(),
166 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000167
Victor Stinnere0daff12011-03-20 23:36:35 +0100168 def write_windows_console(self, *args):
169 retcode = subprocess.call(args,
170 # use a new console to not flood the test output
171 creationflags=subprocess.CREATE_NEW_CONSOLE,
172 # use a shell to hide the console window (SW_HIDE)
173 shell=True)
174 self.assertEqual(retcode, 0)
175
176 @unittest.skipUnless(sys.platform == 'win32',
177 'test specific to the Windows console')
178 def test_write_windows_console(self):
179 # Issue #11395: the Windows console returns an error (12: not enough
180 # space error) on writing into stdout if stdout mode is binary and the
181 # length is greater than 66,000 bytes (or less, depending on heap
182 # usage).
183 code = "print('x' * 100000)"
184 self.write_windows_console(sys.executable, "-c", code)
185 self.write_windows_console(sys.executable, "-u", "-c", code)
186
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000187 def fdopen_helper(self, *args):
188 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200189 f = os.fdopen(fd, *args)
190 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000191
192 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200193 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
194 os.close(fd)
195
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000196 self.fdopen_helper()
197 self.fdopen_helper('r')
198 self.fdopen_helper('r', 100)
199
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100200 def test_replace(self):
201 TESTFN2 = support.TESTFN + ".2"
202 with open(support.TESTFN, 'w') as f:
203 f.write("1")
204 with open(TESTFN2, 'w') as f:
205 f.write("2")
206 self.addCleanup(os.unlink, TESTFN2)
207 os.replace(support.TESTFN, TESTFN2)
208 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
209 with open(TESTFN2, 'r') as f:
210 self.assertEqual(f.read(), "1")
211
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200212
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000213# Test attributes on return values from os.*stat* family.
214class StatAttributeTests(unittest.TestCase):
215 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200216 self.fname = support.TESTFN
217 self.addCleanup(support.unlink, self.fname)
218 with open(self.fname, 'wb') as fp:
219 fp.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220
Serhiy Storchaka43767632013-11-03 21:31:38 +0200221 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000222 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000223 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000224
225 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000226 self.assertEqual(result[stat.ST_SIZE], 3)
227 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000228
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000229 # Make sure all the attributes are there
230 members = dir(result)
231 for name in dir(stat):
232 if name[:3] == 'ST_':
233 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000234 if name.endswith("TIME"):
235 def trunc(x): return int(x)
236 else:
237 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000238 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000239 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000240 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241
Larry Hastings6fe20b32012-04-19 15:07:49 -0700242 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700243 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700244 for name in 'st_atime st_mtime st_ctime'.split():
245 floaty = int(getattr(result, name) * 100000)
246 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700247 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249 try:
250 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200251 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000252 except IndexError:
253 pass
254
255 # Make sure that assignment fails
256 try:
257 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200258 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000259 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000260 pass
261
262 try:
263 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200264 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000265 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000266 pass
267
268 try:
269 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200270 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271 except AttributeError:
272 pass
273
274 # Use the stat_result constructor with a too-short tuple.
275 try:
276 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200277 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278 except TypeError:
279 pass
280
Ezio Melotti42da6632011-03-15 05:18:48 +0200281 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282 try:
283 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
284 except TypeError:
285 pass
286
Antoine Pitrou38425292010-09-21 18:19:07 +0000287 def test_stat_attributes(self):
288 self.check_stat_attributes(self.fname)
289
290 def test_stat_attributes_bytes(self):
291 try:
292 fname = self.fname.encode(sys.getfilesystemencoding())
293 except UnicodeEncodeError:
294 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100295 with warnings.catch_warnings():
296 warnings.simplefilter("ignore", DeprecationWarning)
297 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000298
Christian Heimes25827622013-10-12 01:27:08 +0200299 def test_stat_result_pickle(self):
300 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200301 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
302 p = pickle.dumps(result, proto)
303 self.assertIn(b'stat_result', p)
304 if proto < 4:
305 self.assertIn(b'cos\nstat_result\n', p)
306 unpickled = pickle.loads(p)
307 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200308
Serhiy Storchaka43767632013-11-03 21:31:38 +0200309 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000310 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000311 try:
312 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000313 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000314 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000315 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200316 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000317
318 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000319 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000320
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000321 # Make sure all the attributes are there.
322 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
323 'ffree', 'favail', 'flag', 'namemax')
324 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000325 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000326
327 # Make sure that assignment really fails
328 try:
329 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200330 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000331 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000332 pass
333
334 try:
335 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200336 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000337 except AttributeError:
338 pass
339
340 # Use the constructor with a too-short tuple.
341 try:
342 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200343 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344 except TypeError:
345 pass
346
Ezio Melotti42da6632011-03-15 05:18:48 +0200347 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000348 try:
349 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
350 except TypeError:
351 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000352
Christian Heimes25827622013-10-12 01:27:08 +0200353 @unittest.skipUnless(hasattr(os, 'statvfs'),
354 "need os.statvfs()")
355 def test_statvfs_result_pickle(self):
356 try:
357 result = os.statvfs(self.fname)
358 except OSError as e:
359 # On AtheOS, glibc always returns ENOSYS
360 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200361 self.skipTest('os.statvfs() failed with ENOSYS')
362
Serhiy Storchakabad12572014-12-15 14:03:42 +0200363 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
364 p = pickle.dumps(result, proto)
365 self.assertIn(b'statvfs_result', p)
366 if proto < 4:
367 self.assertIn(b'cos\nstatvfs_result\n', p)
368 unpickled = pickle.loads(p)
369 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200370
Serhiy Storchaka43767632013-11-03 21:31:38 +0200371 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
372 def test_1686475(self):
373 # Verify that an open file can be stat'ed
374 try:
375 os.stat(r"c:\pagefile.sys")
376 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600377 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200378 except OSError as e:
379 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000380
Serhiy Storchaka43767632013-11-03 21:31:38 +0200381 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
382 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
383 def test_15261(self):
384 # Verify that stat'ing a closed fd does not cause crash
385 r, w = os.pipe()
386 try:
387 os.stat(r) # should not raise error
388 finally:
389 os.close(r)
390 os.close(w)
391 with self.assertRaises(OSError) as ctx:
392 os.stat(r)
393 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100394
Zachary Ware63f277b2014-06-19 09:46:37 -0500395 def check_file_attributes(self, result):
396 self.assertTrue(hasattr(result, 'st_file_attributes'))
397 self.assertTrue(isinstance(result.st_file_attributes, int))
398 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
399
400 @unittest.skipUnless(sys.platform == "win32",
401 "st_file_attributes is Win32 specific")
402 def test_file_attributes(self):
403 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
404 result = os.stat(self.fname)
405 self.check_file_attributes(result)
406 self.assertEqual(
407 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
408 0)
409
410 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200411 dirname = support.TESTFN + "dir"
412 os.mkdir(dirname)
413 self.addCleanup(os.rmdir, dirname)
414
415 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500416 self.check_file_attributes(result)
417 self.assertEqual(
418 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
419 stat.FILE_ATTRIBUTE_DIRECTORY)
420
Victor Stinner47aacc82015-06-12 17:26:23 +0200421
422class UtimeTests(unittest.TestCase):
423 def setUp(self):
424 self.dirname = support.TESTFN
425 self.fname = os.path.join(self.dirname, "f1")
426
427 self.addCleanup(support.rmtree, self.dirname)
428 os.mkdir(self.dirname)
429 with open(self.fname, 'wb') as fp:
430 fp.write(b"ABC")
431
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200432 def restore_float_times(state):
433 with warnings.catch_warnings():
434 warnings.simplefilter("ignore", DeprecationWarning)
435
436 os.stat_float_times(state)
437
Victor Stinner47aacc82015-06-12 17:26:23 +0200438 # ensure that st_atime and st_mtime are float
439 with warnings.catch_warnings():
440 warnings.simplefilter("ignore", DeprecationWarning)
441
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200442 old_float_times = os.stat_float_times(-1)
443 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200444
445 os.stat_float_times(True)
446
447 def support_subsecond(self, filename):
448 # Heuristic to check if the filesystem supports timestamp with
449 # subsecond resolution: check if float and int timestamps are different
450 st = os.stat(filename)
451 return ((st.st_atime != st[7])
452 or (st.st_mtime != st[8])
453 or (st.st_ctime != st[9]))
454
455 def _test_utime(self, set_time, filename=None):
456 if not filename:
457 filename = self.fname
458
459 support_subsecond = self.support_subsecond(filename)
460 if support_subsecond:
461 # Timestamp with a resolution of 1 microsecond (10^-6).
462 #
463 # The resolution of the C internal function used by os.utime()
464 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
465 # test with a resolution of 1 ns requires more work:
466 # see the issue #15745.
467 atime_ns = 1002003000 # 1.002003 seconds
468 mtime_ns = 4005006000 # 4.005006 seconds
469 else:
470 # use a resolution of 1 second
471 atime_ns = 5 * 10**9
472 mtime_ns = 8 * 10**9
473
474 set_time(filename, (atime_ns, mtime_ns))
475 st = os.stat(filename)
476
477 if support_subsecond:
478 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
479 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
480 else:
481 self.assertEqual(st.st_atime, atime_ns * 1e-9)
482 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
483 self.assertEqual(st.st_atime_ns, atime_ns)
484 self.assertEqual(st.st_mtime_ns, mtime_ns)
485
486 def test_utime(self):
487 def set_time(filename, ns):
488 # test the ns keyword parameter
489 os.utime(filename, ns=ns)
490 self._test_utime(set_time)
491
492 @staticmethod
493 def ns_to_sec(ns):
494 # Convert a number of nanosecond (int) to a number of seconds (float).
495 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
496 # issue, os.utime() rounds towards minus infinity.
497 return (ns * 1e-9) + 0.5e-9
498
499 def test_utime_by_indexed(self):
500 # pass times as floating point seconds as the second indexed parameter
501 def set_time(filename, ns):
502 atime_ns, mtime_ns = ns
503 atime = self.ns_to_sec(atime_ns)
504 mtime = self.ns_to_sec(mtime_ns)
505 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
506 # or utime(time_t)
507 os.utime(filename, (atime, mtime))
508 self._test_utime(set_time)
509
510 def test_utime_by_times(self):
511 def set_time(filename, ns):
512 atime_ns, mtime_ns = ns
513 atime = self.ns_to_sec(atime_ns)
514 mtime = self.ns_to_sec(mtime_ns)
515 # test the times keyword parameter
516 os.utime(filename, times=(atime, mtime))
517 self._test_utime(set_time)
518
519 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
520 "follow_symlinks support for utime required "
521 "for this test.")
522 def test_utime_nofollow_symlinks(self):
523 def set_time(filename, ns):
524 # use follow_symlinks=False to test utimensat(timespec)
525 # or lutimes(timeval)
526 os.utime(filename, ns=ns, follow_symlinks=False)
527 self._test_utime(set_time)
528
529 @unittest.skipUnless(os.utime in os.supports_fd,
530 "fd support for utime required for this test.")
531 def test_utime_fd(self):
532 def set_time(filename, ns):
533 with open(filename, 'wb') as fp:
534 # use a file descriptor to test futimens(timespec)
535 # or futimes(timeval)
536 os.utime(fp.fileno(), ns=ns)
537 self._test_utime(set_time)
538
539 @unittest.skipUnless(os.utime in os.supports_dir_fd,
540 "dir_fd support for utime required for this test.")
541 def test_utime_dir_fd(self):
542 def set_time(filename, ns):
543 dirname, name = os.path.split(filename)
544 dirfd = os.open(dirname, os.O_RDONLY)
545 try:
546 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
547 os.utime(name, dir_fd=dirfd, ns=ns)
548 finally:
549 os.close(dirfd)
550 self._test_utime(set_time)
551
552 def test_utime_directory(self):
553 def set_time(filename, ns):
554 # test calling os.utime() on a directory
555 os.utime(filename, ns=ns)
556 self._test_utime(set_time, filename=self.dirname)
557
558 def _test_utime_current(self, set_time):
559 # Get the system clock
560 current = time.time()
561
562 # Call os.utime() to set the timestamp to the current system clock
563 set_time(self.fname)
564
565 if not self.support_subsecond(self.fname):
566 delta = 1.0
567 else:
568 # On Windows, the usual resolution of time.time() is 15.6 ms
569 delta = 0.020
570 st = os.stat(self.fname)
571 msg = ("st_time=%r, current=%r, dt=%r"
572 % (st.st_mtime, current, st.st_mtime - current))
573 self.assertAlmostEqual(st.st_mtime, current,
574 delta=delta, msg=msg)
575
576 def test_utime_current(self):
577 def set_time(filename):
578 # Set to the current time in the new way
579 os.utime(self.fname)
580 self._test_utime_current(set_time)
581
582 def test_utime_current_old(self):
583 def set_time(filename):
584 # Set to the current time in the old explicit way.
585 os.utime(self.fname, None)
586 self._test_utime_current(set_time)
587
588 def get_file_system(self, path):
589 if sys.platform == 'win32':
590 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
591 import ctypes
592 kernel32 = ctypes.windll.kernel32
593 buf = ctypes.create_unicode_buffer("", 100)
594 ok = kernel32.GetVolumeInformationW(root, None, 0,
595 None, None, None,
596 buf, len(buf))
597 if ok:
598 return buf.value
599 # return None if the filesystem is unknown
600
601 def test_large_time(self):
602 # Many filesystems are limited to the year 2038. At least, the test
603 # pass with NTFS filesystem.
604 if self.get_file_system(self.dirname) != "NTFS":
605 self.skipTest("requires NTFS")
606
607 large = 5000000000 # some day in 2128
608 os.utime(self.fname, (large, large))
609 self.assertEqual(os.stat(self.fname).st_mtime, large)
610
611 def test_utime_invalid_arguments(self):
612 # seconds and nanoseconds parameters are mutually exclusive
613 with self.assertRaises(ValueError):
614 os.utime(self.fname, (5, 5), ns=(5, 5))
615
616
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000617from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000618
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000619class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000620 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000621 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000622
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000623 def setUp(self):
624 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000625 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000626 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000627 for key, value in self._reference().items():
628 os.environ[key] = value
629
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000630 def tearDown(self):
631 os.environ.clear()
632 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000633 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000634 os.environb.clear()
635 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000636
Christian Heimes90333392007-11-01 19:08:42 +0000637 def _reference(self):
638 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
639
640 def _empty_mapping(self):
641 os.environ.clear()
642 return os.environ
643
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000644 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300645 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000646 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000647 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300648 os.environ.update(HELLO="World")
649 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
650 value = popen.read().strip()
651 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000652
Ezio Melottic7e139b2012-09-26 20:01:34 +0300653 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000654 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300655 with os.popen(
656 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
657 it = iter(popen)
658 self.assertEqual(next(it), "line1\n")
659 self.assertEqual(next(it), "line2\n")
660 self.assertEqual(next(it), "line3\n")
661 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000662
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000663 # Verify environ keys and values from the OS are of the
664 # correct str type.
665 def test_keyvalue_types(self):
666 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000667 self.assertEqual(type(key), str)
668 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000669
Christian Heimes90333392007-11-01 19:08:42 +0000670 def test_items(self):
671 for key, value in self._reference().items():
672 self.assertEqual(os.environ.get(key), value)
673
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000674 # Issue 7310
675 def test___repr__(self):
676 """Check that the repr() of os.environ looks like environ({...})."""
677 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000678 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
679 '{!r}: {!r}'.format(key, value)
680 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000681
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000682 def test_get_exec_path(self):
683 defpath_list = os.defpath.split(os.pathsep)
684 test_path = ['/monty', '/python', '', '/flying/circus']
685 test_env = {'PATH': os.pathsep.join(test_path)}
686
687 saved_environ = os.environ
688 try:
689 os.environ = dict(test_env)
690 # Test that defaulting to os.environ works.
691 self.assertSequenceEqual(test_path, os.get_exec_path())
692 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
693 finally:
694 os.environ = saved_environ
695
696 # No PATH environment variable
697 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
698 # Empty PATH environment variable
699 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
700 # Supplied PATH environment variable
701 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
702
Victor Stinnerb745a742010-05-18 17:17:23 +0000703 if os.supports_bytes_environ:
704 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000705 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000706 # ignore BytesWarning warning
707 with warnings.catch_warnings(record=True):
708 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000709 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000710 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000711 pass
712 else:
713 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000714
715 # bytes key and/or value
716 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
717 ['abc'])
718 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
719 ['abc'])
720 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
721 ['abc'])
722
723 @unittest.skipUnless(os.supports_bytes_environ,
724 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000725 def test_environb(self):
726 # os.environ -> os.environb
727 value = 'euro\u20ac'
728 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000729 value_bytes = value.encode(sys.getfilesystemencoding(),
730 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000731 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000732 msg = "U+20AC character is not encodable to %s" % (
733 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000734 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000735 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000736 self.assertEqual(os.environ['unicode'], value)
737 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000738
739 # os.environb -> os.environ
740 value = b'\xff'
741 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000742 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000743 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000744 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000745
Charles-François Natali2966f102011-11-26 11:32:46 +0100746 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
747 # #13415).
748 @support.requires_freebsd_version(7)
749 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100750 def test_unset_error(self):
751 if sys.platform == "win32":
752 # an environment variable is limited to 32,767 characters
753 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100754 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100755 else:
756 # "=" is not allowed in a variable name
757 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100758 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100759
Victor Stinner6d101392013-04-14 16:35:04 +0200760 def test_key_type(self):
761 missing = 'missingkey'
762 self.assertNotIn(missing, os.environ)
763
Victor Stinner839e5ea2013-04-14 16:43:03 +0200764 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200765 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200766 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200767 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200768
Victor Stinner839e5ea2013-04-14 16:43:03 +0200769 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200770 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200771 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200772 self.assertTrue(cm.exception.__suppress_context__)
773
Victor Stinner6d101392013-04-14 16:35:04 +0200774
Tim Petersc4e09402003-04-25 07:11:48 +0000775class WalkTests(unittest.TestCase):
776 """Tests for os.walk()."""
777
Victor Stinner0561c532015-03-12 10:28:24 +0100778 # Wrapper to hide minor differences between os.walk and os.fwalk
779 # to tests both functions with the same code base
780 def walk(self, directory, topdown=True, follow_symlinks=False):
781 walk_it = os.walk(directory,
782 topdown=topdown,
783 followlinks=follow_symlinks)
784 for root, dirs, files in walk_it:
785 yield (root, dirs, files)
786
Charles-François Natali7372b062012-02-05 15:15:38 +0100787 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100788 join = os.path.join
Tim Petersc4e09402003-04-25 07:11:48 +0000789
790 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000791 # TESTFN/
792 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000793 # tmp1
794 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000795 # tmp2
796 # SUB11/ no kids
797 # SUB2/ a file kid and a dirsymlink kid
798 # tmp3
799 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200800 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000801 # TEST2/
802 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100803 self.walk_path = join(support.TESTFN, "TEST1")
804 self.sub1_path = join(self.walk_path, "SUB1")
805 self.sub11_path = join(self.sub1_path, "SUB11")
806 sub2_path = join(self.walk_path, "SUB2")
807 tmp1_path = join(self.walk_path, "tmp1")
808 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000809 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100810 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000811 t2_path = join(support.TESTFN, "TEST2")
812 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200813 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000814
815 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100816 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000817 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000818 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100819
Guido van Rossumd8faa362007-04-27 19:54:29 +0000820 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000821 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000822 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
823 f.close()
824
Victor Stinner0561c532015-03-12 10:28:24 +0100825 if support.can_symlink():
826 os.symlink(os.path.abspath(t2_path), self.link_path)
827 os.symlink('broken', broken_link_path, True)
828 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
829 else:
830 self.sub2_tree = (sub2_path, [], ["tmp3"])
831
832 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000833 # Walk top-down.
Victor Stinner0561c532015-03-12 10:28:24 +0100834 all = list(os.walk(self.walk_path))
835
Tim Petersc4e09402003-04-25 07:11:48 +0000836 self.assertEqual(len(all), 4)
837 # We can't know which order SUB1 and SUB2 will appear in.
838 # Not flipped: TESTFN, SUB1, SUB11, SUB2
839 # flipped: TESTFN, SUB2, SUB1, SUB11
840 flipped = all[0][1][0] != "SUB1"
841 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200842 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100843 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
844 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
845 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
846 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000847
Victor Stinner0561c532015-03-12 10:28:24 +0100848 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000849 # Prune the search.
850 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100851 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000852 all.append((root, dirs, files))
853 # Don't descend into SUB1.
854 if 'SUB1' in dirs:
855 # Note that this also mutates the dirs we appended to all!
856 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000857
Victor Stinner0561c532015-03-12 10:28:24 +0100858 self.assertEqual(len(all), 2)
859 self.assertEqual(all[0],
860 (self.walk_path, ["SUB2"], ["tmp1"]))
861
862 all[1][-1].sort()
863 self.assertEqual(all[1], self.sub2_tree)
864
865 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000866 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100867 all = list(self.walk(self.walk_path, topdown=False))
868
Tim Petersc4e09402003-04-25 07:11:48 +0000869 self.assertEqual(len(all), 4)
870 # We can't know which order SUB1 and SUB2 will appear in.
871 # Not flipped: SUB11, SUB1, SUB2, TESTFN
872 # flipped: SUB2, SUB11, SUB1, TESTFN
873 flipped = all[3][1][0] != "SUB1"
874 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200875 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100876 self.assertEqual(all[3],
877 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
878 self.assertEqual(all[flipped],
879 (self.sub11_path, [], []))
880 self.assertEqual(all[flipped + 1],
881 (self.sub1_path, ["SUB11"], ["tmp2"]))
882 self.assertEqual(all[2 - 2 * flipped],
883 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000884
Victor Stinner0561c532015-03-12 10:28:24 +0100885 def test_walk_symlink(self):
886 if not support.can_symlink():
887 self.skipTest("need symlink support")
888
889 # Walk, following symlinks.
890 walk_it = self.walk(self.walk_path, follow_symlinks=True)
891 for root, dirs, files in walk_it:
892 if root == self.link_path:
893 self.assertEqual(dirs, [])
894 self.assertEqual(files, ["tmp4"])
895 break
896 else:
897 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000898
899 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000900 # Tear everything down. This is a decent use for bottom-up on
901 # Windows, which doesn't have a recursive delete command. The
902 # (not so) subtlety is that rmdir will fail unless the dir's
903 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000904 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000905 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000906 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000907 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000908 dirname = os.path.join(root, name)
909 if not os.path.islink(dirname):
910 os.rmdir(dirname)
911 else:
912 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000913 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000914
Charles-François Natali7372b062012-02-05 15:15:38 +0100915
916@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
917class FwalkTests(WalkTests):
918 """Tests for os.fwalk()."""
919
Victor Stinner0561c532015-03-12 10:28:24 +0100920 def walk(self, directory, topdown=True, follow_symlinks=False):
921 walk_it = os.fwalk(directory,
922 topdown=topdown,
923 follow_symlinks=follow_symlinks)
924 for root, dirs, files, root_fd in walk_it:
925 yield (root, dirs, files)
926
927
Larry Hastingsc48fe982012-06-25 04:49:05 -0700928 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
929 """
930 compare with walk() results.
931 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700932 walk_kwargs = walk_kwargs.copy()
933 fwalk_kwargs = fwalk_kwargs.copy()
934 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
935 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
936 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700937
Charles-François Natali7372b062012-02-05 15:15:38 +0100938 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700939 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100940 expected[root] = (set(dirs), set(files))
941
Larry Hastingsc48fe982012-06-25 04:49:05 -0700942 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100943 self.assertIn(root, expected)
944 self.assertEqual(expected[root], (set(dirs), set(files)))
945
Larry Hastingsc48fe982012-06-25 04:49:05 -0700946 def test_compare_to_walk(self):
947 kwargs = {'top': support.TESTFN}
948 self._compare_to_walk(kwargs, kwargs)
949
Charles-François Natali7372b062012-02-05 15:15:38 +0100950 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700951 try:
952 fd = os.open(".", os.O_RDONLY)
953 walk_kwargs = {'top': support.TESTFN}
954 fwalk_kwargs = walk_kwargs.copy()
955 fwalk_kwargs['dir_fd'] = fd
956 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
957 finally:
958 os.close(fd)
959
960 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100961 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700962 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
963 args = support.TESTFN, topdown, None
964 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100965 # check that the FD is valid
966 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700967 # redundant check
968 os.stat(rootfd)
969 # check that listdir() returns consistent information
970 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100971
972 def test_fd_leak(self):
973 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
974 # we both check that calling fwalk() a large number of times doesn't
975 # yield EMFILE, and that the minimum allocated FD hasn't changed.
976 minfd = os.dup(1)
977 os.close(minfd)
978 for i in range(256):
979 for x in os.fwalk(support.TESTFN):
980 pass
981 newfd = os.dup(1)
982 self.addCleanup(os.close, newfd)
983 self.assertEqual(newfd, minfd)
984
985 def tearDown(self):
986 # cleanup
987 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
988 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700989 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100990 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700991 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700992 if stat.S_ISDIR(st.st_mode):
993 os.rmdir(name, dir_fd=rootfd)
994 else:
995 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100996 os.rmdir(support.TESTFN)
997
998
Guido van Rossume7ba4952007-06-06 23:52:48 +0000999class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001000 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001001 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001002
1003 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001004 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001005 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1006 os.makedirs(path) # Should work
1007 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1008 os.makedirs(path)
1009
1010 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001011 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001012 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1013 os.makedirs(path)
1014 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1015 'dir5', 'dir6')
1016 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001017
Terry Reedy5a22b652010-12-02 07:05:56 +00001018 def test_exist_ok_existing_directory(self):
1019 path = os.path.join(support.TESTFN, 'dir1')
1020 mode = 0o777
1021 old_mask = os.umask(0o022)
1022 os.makedirs(path, mode)
1023 self.assertRaises(OSError, os.makedirs, path, mode)
1024 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001025 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001026 os.makedirs(path, mode=mode, exist_ok=True)
1027 os.umask(old_mask)
1028
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001029 def test_exist_ok_s_isgid_directory(self):
1030 path = os.path.join(support.TESTFN, 'dir1')
1031 S_ISGID = stat.S_ISGID
1032 mode = 0o777
1033 old_mask = os.umask(0o022)
1034 try:
1035 existing_testfn_mode = stat.S_IMODE(
1036 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001037 try:
1038 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001039 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001040 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001041 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1042 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1043 # The os should apply S_ISGID from the parent dir for us, but
1044 # this test need not depend on that behavior. Be explicit.
1045 os.makedirs(path, mode | S_ISGID)
1046 # http://bugs.python.org/issue14992
1047 # Should not fail when the bit is already set.
1048 os.makedirs(path, mode, exist_ok=True)
1049 # remove the bit.
1050 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001051 # May work even when the bit is not already set when demanded.
1052 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001053 finally:
1054 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001055
1056 def test_exist_ok_existing_regular_file(self):
1057 base = support.TESTFN
1058 path = os.path.join(support.TESTFN, 'dir1')
1059 f = open(path, 'w')
1060 f.write('abc')
1061 f.close()
1062 self.assertRaises(OSError, os.makedirs, path)
1063 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1064 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1065 os.remove(path)
1066
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001067 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001068 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001069 'dir4', 'dir5', 'dir6')
1070 # If the tests failed, the bottom-most directory ('../dir6')
1071 # may not have been created, so we look for the outermost directory
1072 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001073 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001074 path = os.path.dirname(path)
1075
1076 os.removedirs(path)
1077
Andrew Svetlov405faed2012-12-25 12:18:09 +02001078
R David Murrayf2ad1732014-12-25 18:36:56 -05001079@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1080class ChownFileTests(unittest.TestCase):
1081
Berker Peksag036a71b2015-07-21 09:29:48 +03001082 @classmethod
1083 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001084 os.mkdir(support.TESTFN)
1085
1086 def test_chown_uid_gid_arguments_must_be_index(self):
1087 stat = os.stat(support.TESTFN)
1088 uid = stat.st_uid
1089 gid = stat.st_gid
1090 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1091 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1092 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1093 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1094 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1095
1096 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1097 def test_chown(self):
1098 gid_1, gid_2 = groups[:2]
1099 uid = os.stat(support.TESTFN).st_uid
1100 os.chown(support.TESTFN, uid, gid_1)
1101 gid = os.stat(support.TESTFN).st_gid
1102 self.assertEqual(gid, gid_1)
1103 os.chown(support.TESTFN, uid, gid_2)
1104 gid = os.stat(support.TESTFN).st_gid
1105 self.assertEqual(gid, gid_2)
1106
1107 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1108 "test needs root privilege and more than one user")
1109 def test_chown_with_root(self):
1110 uid_1, uid_2 = all_users[:2]
1111 gid = os.stat(support.TESTFN).st_gid
1112 os.chown(support.TESTFN, uid_1, gid)
1113 uid = os.stat(support.TESTFN).st_uid
1114 self.assertEqual(uid, uid_1)
1115 os.chown(support.TESTFN, uid_2, gid)
1116 uid = os.stat(support.TESTFN).st_uid
1117 self.assertEqual(uid, uid_2)
1118
1119 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1120 "test needs non-root account and more than one user")
1121 def test_chown_without_permission(self):
1122 uid_1, uid_2 = all_users[:2]
1123 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001124 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001125 os.chown(support.TESTFN, uid_1, gid)
1126 os.chown(support.TESTFN, uid_2, gid)
1127
Berker Peksag036a71b2015-07-21 09:29:48 +03001128 @classmethod
1129 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001130 os.rmdir(support.TESTFN)
1131
1132
Andrew Svetlov405faed2012-12-25 12:18:09 +02001133class RemoveDirsTests(unittest.TestCase):
1134 def setUp(self):
1135 os.makedirs(support.TESTFN)
1136
1137 def tearDown(self):
1138 support.rmtree(support.TESTFN)
1139
1140 def test_remove_all(self):
1141 dira = os.path.join(support.TESTFN, 'dira')
1142 os.mkdir(dira)
1143 dirb = os.path.join(dira, 'dirb')
1144 os.mkdir(dirb)
1145 os.removedirs(dirb)
1146 self.assertFalse(os.path.exists(dirb))
1147 self.assertFalse(os.path.exists(dira))
1148 self.assertFalse(os.path.exists(support.TESTFN))
1149
1150 def test_remove_partial(self):
1151 dira = os.path.join(support.TESTFN, 'dira')
1152 os.mkdir(dira)
1153 dirb = os.path.join(dira, 'dirb')
1154 os.mkdir(dirb)
1155 with open(os.path.join(dira, 'file.txt'), 'w') as f:
1156 f.write('text')
1157 os.removedirs(dirb)
1158 self.assertFalse(os.path.exists(dirb))
1159 self.assertTrue(os.path.exists(dira))
1160 self.assertTrue(os.path.exists(support.TESTFN))
1161
1162 def test_remove_nothing(self):
1163 dira = os.path.join(support.TESTFN, 'dira')
1164 os.mkdir(dira)
1165 dirb = os.path.join(dira, 'dirb')
1166 os.mkdir(dirb)
1167 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
1168 f.write('text')
1169 with self.assertRaises(OSError):
1170 os.removedirs(dirb)
1171 self.assertTrue(os.path.exists(dirb))
1172 self.assertTrue(os.path.exists(dira))
1173 self.assertTrue(os.path.exists(support.TESTFN))
1174
1175
Guido van Rossume7ba4952007-06-06 23:52:48 +00001176class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001177 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +02001178 with open(os.devnull, 'wb') as f:
1179 f.write(b'hello')
1180 f.close()
1181 with open(os.devnull, 'rb') as f:
1182 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001183
Andrew Svetlov405faed2012-12-25 12:18:09 +02001184
Guido van Rossume7ba4952007-06-06 23:52:48 +00001185class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001186 def test_urandom_length(self):
1187 self.assertEqual(len(os.urandom(0)), 0)
1188 self.assertEqual(len(os.urandom(1)), 1)
1189 self.assertEqual(len(os.urandom(10)), 10)
1190 self.assertEqual(len(os.urandom(100)), 100)
1191 self.assertEqual(len(os.urandom(1000)), 1000)
1192
1193 def test_urandom_value(self):
1194 data1 = os.urandom(16)
1195 data2 = os.urandom(16)
1196 self.assertNotEqual(data1, data2)
1197
1198 def get_urandom_subprocess(self, count):
1199 code = '\n'.join((
1200 'import os, sys',
1201 'data = os.urandom(%s)' % count,
1202 'sys.stdout.buffer.write(data)',
1203 'sys.stdout.buffer.flush()'))
1204 out = assert_python_ok('-c', code)
1205 stdout = out[1]
1206 self.assertEqual(len(stdout), 16)
1207 return stdout
1208
1209 def test_urandom_subprocess(self):
1210 data1 = self.get_urandom_subprocess(16)
1211 data2 = self.get_urandom_subprocess(16)
1212 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001213
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001214
1215HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
Victor Stinner9eb57c52015-03-19 22:21:49 +01001216HAVE_GETRANDOM = (sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001217
1218@unittest.skipIf(HAVE_GETENTROPY,
1219 "getentropy() does not use a file descriptor")
Victor Stinner9eb57c52015-03-19 22:21:49 +01001220@unittest.skipIf(HAVE_GETRANDOM,
1221 "getrandom() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001222class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001223 @unittest.skipUnless(resource, "test requires the resource module")
1224 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001225 # Check urandom() failing when it is not able to open /dev/random.
1226 # We spawn a new process to make the test more robust (if getrlimit()
1227 # failed to restore the file descriptor limit after this, the whole
1228 # test suite would crash; this actually happened on the OS X Tiger
1229 # buildbot).
1230 code = """if 1:
1231 import errno
1232 import os
1233 import resource
1234
1235 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1236 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1237 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001238 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001239 except OSError as e:
1240 assert e.errno == errno.EMFILE, e.errno
1241 else:
1242 raise AssertionError("OSError not raised")
1243 """
1244 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001245
Antoine Pitroue472aea2014-04-26 14:33:03 +02001246 def test_urandom_fd_closed(self):
1247 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1248 # closed.
1249 code = """if 1:
1250 import os
1251 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001252 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001253 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001254 with test.support.SuppressCrashReport():
1255 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001256 sys.stdout.buffer.write(os.urandom(4))
1257 """
1258 rc, out, err = assert_python_ok('-Sc', code)
1259
1260 def test_urandom_fd_reopened(self):
1261 # Issue #21207: urandom() should detect its fd to /dev/urandom
1262 # changed to something else, and reopen it.
1263 with open(support.TESTFN, 'wb') as f:
1264 f.write(b"x" * 256)
1265 self.addCleanup(os.unlink, support.TESTFN)
1266 code = """if 1:
1267 import os
1268 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001269 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001270 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001271 with test.support.SuppressCrashReport():
1272 for fd in range(3, 256):
1273 try:
1274 os.close(fd)
1275 except OSError:
1276 pass
1277 else:
1278 # Found the urandom fd (XXX hopefully)
1279 break
1280 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001281 with open({TESTFN!r}, 'rb') as f:
1282 os.dup2(f.fileno(), fd)
1283 sys.stdout.buffer.write(os.urandom(4))
1284 sys.stdout.buffer.write(os.urandom(4))
1285 """.format(TESTFN=support.TESTFN)
1286 rc, out, err = assert_python_ok('-Sc', code)
1287 self.assertEqual(len(out), 8)
1288 self.assertNotEqual(out[0:4], out[4:8])
1289 rc, out2, err2 = assert_python_ok('-Sc', code)
1290 self.assertEqual(len(out2), 8)
1291 self.assertNotEqual(out2, out)
1292
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001293
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001294@contextlib.contextmanager
1295def _execvpe_mockup(defpath=None):
1296 """
1297 Stubs out execv and execve functions when used as context manager.
1298 Records exec calls. The mock execv and execve functions always raise an
1299 exception as they would normally never return.
1300 """
1301 # A list of tuples containing (function name, first arg, args)
1302 # of calls to execv or execve that have been made.
1303 calls = []
1304
1305 def mock_execv(name, *args):
1306 calls.append(('execv', name, args))
1307 raise RuntimeError("execv called")
1308
1309 def mock_execve(name, *args):
1310 calls.append(('execve', name, args))
1311 raise OSError(errno.ENOTDIR, "execve called")
1312
1313 try:
1314 orig_execv = os.execv
1315 orig_execve = os.execve
1316 orig_defpath = os.defpath
1317 os.execv = mock_execv
1318 os.execve = mock_execve
1319 if defpath is not None:
1320 os.defpath = defpath
1321 yield calls
1322 finally:
1323 os.execv = orig_execv
1324 os.execve = orig_execve
1325 os.defpath = orig_defpath
1326
Guido van Rossume7ba4952007-06-06 23:52:48 +00001327class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001328 @unittest.skipIf(USING_LINUXTHREADS,
1329 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001330 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001331 self.assertRaises(OSError, os.execvpe, 'no such app-',
1332 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001333
Thomas Heller6790d602007-08-30 17:15:14 +00001334 def test_execvpe_with_bad_arglist(self):
1335 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1336
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001337 @unittest.skipUnless(hasattr(os, '_execvpe'),
1338 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001339 def _test_internal_execvpe(self, test_type):
1340 program_path = os.sep + 'absolutepath'
1341 if test_type is bytes:
1342 program = b'executable'
1343 fullpath = os.path.join(os.fsencode(program_path), program)
1344 native_fullpath = fullpath
1345 arguments = [b'progname', 'arg1', 'arg2']
1346 else:
1347 program = 'executable'
1348 arguments = ['progname', 'arg1', 'arg2']
1349 fullpath = os.path.join(program_path, program)
1350 if os.name != "nt":
1351 native_fullpath = os.fsencode(fullpath)
1352 else:
1353 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001354 env = {'spam': 'beans'}
1355
Victor Stinnerb745a742010-05-18 17:17:23 +00001356 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001357 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001358 self.assertRaises(RuntimeError,
1359 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001360 self.assertEqual(len(calls), 1)
1361 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1362
Victor Stinnerb745a742010-05-18 17:17:23 +00001363 # test os._execvpe() with a relative path:
1364 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001365 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001366 self.assertRaises(OSError,
1367 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001368 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001369 self.assertSequenceEqual(calls[0],
1370 ('execve', native_fullpath, (arguments, env)))
1371
1372 # test os._execvpe() with a relative path:
1373 # os.get_exec_path() reads the 'PATH' variable
1374 with _execvpe_mockup() as calls:
1375 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001376 if test_type is bytes:
1377 env_path[b'PATH'] = program_path
1378 else:
1379 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001380 self.assertRaises(OSError,
1381 os._execvpe, program, arguments, env=env_path)
1382 self.assertEqual(len(calls), 1)
1383 self.assertSequenceEqual(calls[0],
1384 ('execve', native_fullpath, (arguments, env_path)))
1385
1386 def test_internal_execvpe_str(self):
1387 self._test_internal_execvpe(str)
1388 if os.name != "nt":
1389 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001390
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001391
Serhiy Storchaka43767632013-11-03 21:31:38 +02001392@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001393class Win32ErrorTests(unittest.TestCase):
1394 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001395 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001396
1397 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001398 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001399
1400 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001401 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001402
1403 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001404 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001405 try:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001406 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Benjamin Petersonf91df042009-02-13 02:50:59 +00001407 finally:
1408 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001409 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001410
1411 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001412 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001413
Thomas Wouters477c8d52006-05-27 19:21:47 +00001414 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001415 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001416
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001417class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001418 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001419 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1420 #singles.append("close")
1421 #We omit close because it doesn'r raise an exception on some platforms
1422 def get_single(f):
1423 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001424 if hasattr(os, f):
1425 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001426 return helper
1427 for f in singles:
1428 locals()["test_"+f] = get_single(f)
1429
Benjamin Peterson7522c742009-01-19 21:00:09 +00001430 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001431 try:
1432 f(support.make_bad_fd(), *args)
1433 except OSError as e:
1434 self.assertEqual(e.errno, errno.EBADF)
1435 else:
1436 self.fail("%r didn't raise a OSError with a bad file descriptor"
1437 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001438
Serhiy Storchaka43767632013-11-03 21:31:38 +02001439 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001440 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001441 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001442
Serhiy Storchaka43767632013-11-03 21:31:38 +02001443 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001444 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001445 fd = support.make_bad_fd()
1446 # Make sure none of the descriptors we are about to close are
1447 # currently valid (issue 6542).
1448 for i in range(10):
1449 try: os.fstat(fd+i)
1450 except OSError:
1451 pass
1452 else:
1453 break
1454 if i < 2:
1455 raise unittest.SkipTest(
1456 "Unable to acquire a range of invalid file descriptors")
1457 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001458
Serhiy Storchaka43767632013-11-03 21:31:38 +02001459 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001460 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001461 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001462
Serhiy Storchaka43767632013-11-03 21:31:38 +02001463 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001464 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001465 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001466
Serhiy Storchaka43767632013-11-03 21:31:38 +02001467 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001468 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001469 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001470
Serhiy Storchaka43767632013-11-03 21:31:38 +02001471 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001472 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001473 self.check(os.pathconf, "PC_NAME_MAX")
1474 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001475
Serhiy Storchaka43767632013-11-03 21:31:38 +02001476 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001477 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001478 self.check(os.truncate, 0)
1479 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001480
Serhiy Storchaka43767632013-11-03 21:31:38 +02001481 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001482 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001483 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001484
Serhiy Storchaka43767632013-11-03 21:31:38 +02001485 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001486 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001487 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001488
Victor Stinner57ddf782014-01-08 15:21:28 +01001489 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1490 def test_readv(self):
1491 buf = bytearray(10)
1492 self.check(os.readv, [buf])
1493
Serhiy Storchaka43767632013-11-03 21:31:38 +02001494 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001495 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001496 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001497
Serhiy Storchaka43767632013-11-03 21:31:38 +02001498 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001499 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001500 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001501
Victor Stinner57ddf782014-01-08 15:21:28 +01001502 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1503 def test_writev(self):
1504 self.check(os.writev, [b'abc'])
1505
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001506 def test_inheritable(self):
1507 self.check(os.get_inheritable)
1508 self.check(os.set_inheritable, True)
1509
1510 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1511 'needs os.get_blocking() and os.set_blocking()')
1512 def test_blocking(self):
1513 self.check(os.get_blocking)
1514 self.check(os.set_blocking, True)
1515
Brian Curtin1b9df392010-11-24 20:24:31 +00001516
1517class LinkTests(unittest.TestCase):
1518 def setUp(self):
1519 self.file1 = support.TESTFN
1520 self.file2 = os.path.join(support.TESTFN + "2")
1521
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001522 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001523 for file in (self.file1, self.file2):
1524 if os.path.exists(file):
1525 os.unlink(file)
1526
Brian Curtin1b9df392010-11-24 20:24:31 +00001527 def _test_link(self, file1, file2):
1528 with open(file1, "w") as f1:
1529 f1.write("test")
1530
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001531 with warnings.catch_warnings():
1532 warnings.simplefilter("ignore", DeprecationWarning)
1533 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001534 with open(file1, "r") as f1, open(file2, "r") as f2:
1535 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1536
1537 def test_link(self):
1538 self._test_link(self.file1, self.file2)
1539
1540 def test_link_bytes(self):
1541 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1542 bytes(self.file2, sys.getfilesystemencoding()))
1543
Brian Curtinf498b752010-11-30 15:54:04 +00001544 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001545 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001546 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001547 except UnicodeError:
1548 raise unittest.SkipTest("Unable to encode for this platform.")
1549
Brian Curtinf498b752010-11-30 15:54:04 +00001550 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001551 self.file2 = self.file1 + "2"
1552 self._test_link(self.file1, self.file2)
1553
Serhiy Storchaka43767632013-11-03 21:31:38 +02001554@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1555class PosixUidGidTests(unittest.TestCase):
1556 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1557 def test_setuid(self):
1558 if os.getuid() != 0:
1559 self.assertRaises(OSError, os.setuid, 0)
1560 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001561
Serhiy Storchaka43767632013-11-03 21:31:38 +02001562 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1563 def test_setgid(self):
1564 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1565 self.assertRaises(OSError, os.setgid, 0)
1566 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001567
Serhiy Storchaka43767632013-11-03 21:31:38 +02001568 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1569 def test_seteuid(self):
1570 if os.getuid() != 0:
1571 self.assertRaises(OSError, os.seteuid, 0)
1572 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001573
Serhiy Storchaka43767632013-11-03 21:31:38 +02001574 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1575 def test_setegid(self):
1576 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1577 self.assertRaises(OSError, os.setegid, 0)
1578 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001579
Serhiy Storchaka43767632013-11-03 21:31:38 +02001580 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1581 def test_setreuid(self):
1582 if os.getuid() != 0:
1583 self.assertRaises(OSError, os.setreuid, 0, 0)
1584 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1585 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001586
Serhiy Storchaka43767632013-11-03 21:31:38 +02001587 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1588 def test_setreuid_neg1(self):
1589 # Needs to accept -1. We run this in a subprocess to avoid
1590 # altering the test runner's process state (issue8045).
1591 subprocess.check_call([
1592 sys.executable, '-c',
1593 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001594
Serhiy Storchaka43767632013-11-03 21:31:38 +02001595 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1596 def test_setregid(self):
1597 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1598 self.assertRaises(OSError, os.setregid, 0, 0)
1599 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1600 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001601
Serhiy Storchaka43767632013-11-03 21:31:38 +02001602 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1603 def test_setregid_neg1(self):
1604 # Needs to accept -1. We run this in a subprocess to avoid
1605 # altering the test runner's process state (issue8045).
1606 subprocess.check_call([
1607 sys.executable, '-c',
1608 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001609
Serhiy Storchaka43767632013-11-03 21:31:38 +02001610@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1611class Pep383Tests(unittest.TestCase):
1612 def setUp(self):
1613 if support.TESTFN_UNENCODABLE:
1614 self.dir = support.TESTFN_UNENCODABLE
1615 elif support.TESTFN_NONASCII:
1616 self.dir = support.TESTFN_NONASCII
1617 else:
1618 self.dir = support.TESTFN
1619 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001620
Serhiy Storchaka43767632013-11-03 21:31:38 +02001621 bytesfn = []
1622 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001623 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001624 fn = os.fsencode(fn)
1625 except UnicodeEncodeError:
1626 return
1627 bytesfn.append(fn)
1628 add_filename(support.TESTFN_UNICODE)
1629 if support.TESTFN_UNENCODABLE:
1630 add_filename(support.TESTFN_UNENCODABLE)
1631 if support.TESTFN_NONASCII:
1632 add_filename(support.TESTFN_NONASCII)
1633 if not bytesfn:
1634 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001635
Serhiy Storchaka43767632013-11-03 21:31:38 +02001636 self.unicodefn = set()
1637 os.mkdir(self.dir)
1638 try:
1639 for fn in bytesfn:
1640 support.create_empty_file(os.path.join(self.bdir, fn))
1641 fn = os.fsdecode(fn)
1642 if fn in self.unicodefn:
1643 raise ValueError("duplicate filename")
1644 self.unicodefn.add(fn)
1645 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001646 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001647 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001648
Serhiy Storchaka43767632013-11-03 21:31:38 +02001649 def tearDown(self):
1650 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001651
Serhiy Storchaka43767632013-11-03 21:31:38 +02001652 def test_listdir(self):
1653 expected = self.unicodefn
1654 found = set(os.listdir(self.dir))
1655 self.assertEqual(found, expected)
1656 # test listdir without arguments
1657 current_directory = os.getcwd()
1658 try:
1659 os.chdir(os.sep)
1660 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1661 finally:
1662 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001663
Serhiy Storchaka43767632013-11-03 21:31:38 +02001664 def test_open(self):
1665 for fn in self.unicodefn:
1666 f = open(os.path.join(self.dir, fn), 'rb')
1667 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001668
Serhiy Storchaka43767632013-11-03 21:31:38 +02001669 @unittest.skipUnless(hasattr(os, 'statvfs'),
1670 "need os.statvfs()")
1671 def test_statvfs(self):
1672 # issue #9645
1673 for fn in self.unicodefn:
1674 # should not fail with file not found error
1675 fullname = os.path.join(self.dir, fn)
1676 os.statvfs(fullname)
1677
1678 def test_stat(self):
1679 for fn in self.unicodefn:
1680 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001681
Brian Curtineb24d742010-04-12 17:16:38 +00001682@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1683class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001684 def _kill(self, sig):
1685 # Start sys.executable as a subprocess and communicate from the
1686 # subprocess to the parent that the interpreter is ready. When it
1687 # becomes ready, send *sig* via os.kill to the subprocess and check
1688 # that the return code is equal to *sig*.
1689 import ctypes
1690 from ctypes import wintypes
1691 import msvcrt
1692
1693 # Since we can't access the contents of the process' stdout until the
1694 # process has exited, use PeekNamedPipe to see what's inside stdout
1695 # without waiting. This is done so we can tell that the interpreter
1696 # is started and running at a point where it could handle a signal.
1697 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1698 PeekNamedPipe.restype = wintypes.BOOL
1699 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1700 ctypes.POINTER(ctypes.c_char), # stdout buf
1701 wintypes.DWORD, # Buffer size
1702 ctypes.POINTER(wintypes.DWORD), # bytes read
1703 ctypes.POINTER(wintypes.DWORD), # bytes avail
1704 ctypes.POINTER(wintypes.DWORD)) # bytes left
1705 msg = "running"
1706 proc = subprocess.Popen([sys.executable, "-c",
1707 "import sys;"
1708 "sys.stdout.write('{}');"
1709 "sys.stdout.flush();"
1710 "input()".format(msg)],
1711 stdout=subprocess.PIPE,
1712 stderr=subprocess.PIPE,
1713 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001714 self.addCleanup(proc.stdout.close)
1715 self.addCleanup(proc.stderr.close)
1716 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001717
1718 count, max = 0, 100
1719 while count < max and proc.poll() is None:
1720 # Create a string buffer to store the result of stdout from the pipe
1721 buf = ctypes.create_string_buffer(len(msg))
1722 # Obtain the text currently in proc.stdout
1723 # Bytes read/avail/left are left as NULL and unused
1724 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1725 buf, ctypes.sizeof(buf), None, None, None)
1726 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1727 if buf.value:
1728 self.assertEqual(msg, buf.value.decode())
1729 break
1730 time.sleep(0.1)
1731 count += 1
1732 else:
1733 self.fail("Did not receive communication from the subprocess")
1734
Brian Curtineb24d742010-04-12 17:16:38 +00001735 os.kill(proc.pid, sig)
1736 self.assertEqual(proc.wait(), sig)
1737
1738 def test_kill_sigterm(self):
1739 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001740 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001741
1742 def test_kill_int(self):
1743 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001744 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001745
1746 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001747 tagname = "test_os_%s" % uuid.uuid1()
1748 m = mmap.mmap(-1, 1, tagname)
1749 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001750 # Run a script which has console control handling enabled.
1751 proc = subprocess.Popen([sys.executable,
1752 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001753 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001754 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1755 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001756 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001757 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001758 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001759 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001760 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001761 count += 1
1762 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001763 # Forcefully kill the process if we weren't able to signal it.
1764 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001765 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001766 os.kill(proc.pid, event)
1767 # proc.send_signal(event) could also be done here.
1768 # Allow time for the signal to be passed and the process to exit.
1769 time.sleep(0.5)
1770 if not proc.poll():
1771 # Forcefully kill the process if we weren't able to signal it.
1772 os.kill(proc.pid, signal.SIGINT)
1773 self.fail("subprocess did not stop on {}".format(name))
1774
1775 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1776 def test_CTRL_C_EVENT(self):
1777 from ctypes import wintypes
1778 import ctypes
1779
1780 # Make a NULL value by creating a pointer with no argument.
1781 NULL = ctypes.POINTER(ctypes.c_int)()
1782 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1783 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1784 wintypes.BOOL)
1785 SetConsoleCtrlHandler.restype = wintypes.BOOL
1786
1787 # Calling this with NULL and FALSE causes the calling process to
1788 # handle CTRL+C, rather than ignore it. This property is inherited
1789 # by subprocesses.
1790 SetConsoleCtrlHandler(NULL, 0)
1791
1792 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1793
1794 def test_CTRL_BREAK_EVENT(self):
1795 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1796
1797
Brian Curtind40e6f72010-07-08 21:39:08 +00001798@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001799class Win32ListdirTests(unittest.TestCase):
1800 """Test listdir on Windows."""
1801
1802 def setUp(self):
1803 self.created_paths = []
1804 for i in range(2):
1805 dir_name = 'SUB%d' % i
1806 dir_path = os.path.join(support.TESTFN, dir_name)
1807 file_name = 'FILE%d' % i
1808 file_path = os.path.join(support.TESTFN, file_name)
1809 os.makedirs(dir_path)
1810 with open(file_path, 'w') as f:
1811 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1812 self.created_paths.extend([dir_name, file_name])
1813 self.created_paths.sort()
1814
1815 def tearDown(self):
1816 shutil.rmtree(support.TESTFN)
1817
1818 def test_listdir_no_extended_path(self):
1819 """Test when the path is not an "extended" path."""
1820 # unicode
1821 self.assertEqual(
1822 sorted(os.listdir(support.TESTFN)),
1823 self.created_paths)
1824 # bytes
1825 self.assertEqual(
1826 sorted(os.listdir(os.fsencode(support.TESTFN))),
1827 [os.fsencode(path) for path in self.created_paths])
1828
1829 def test_listdir_extended_path(self):
1830 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001831 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001832 # unicode
1833 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1834 self.assertEqual(
1835 sorted(os.listdir(path)),
1836 self.created_paths)
1837 # bytes
1838 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1839 self.assertEqual(
1840 sorted(os.listdir(path)),
1841 [os.fsencode(path) for path in self.created_paths])
1842
1843
1844@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001845@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001846class Win32SymlinkTests(unittest.TestCase):
1847 filelink = 'filelinktest'
1848 filelink_target = os.path.abspath(__file__)
1849 dirlink = 'dirlinktest'
1850 dirlink_target = os.path.dirname(filelink_target)
1851 missing_link = 'missing link'
1852
1853 def setUp(self):
1854 assert os.path.exists(self.dirlink_target)
1855 assert os.path.exists(self.filelink_target)
1856 assert not os.path.exists(self.dirlink)
1857 assert not os.path.exists(self.filelink)
1858 assert not os.path.exists(self.missing_link)
1859
1860 def tearDown(self):
1861 if os.path.exists(self.filelink):
1862 os.remove(self.filelink)
1863 if os.path.exists(self.dirlink):
1864 os.rmdir(self.dirlink)
1865 if os.path.lexists(self.missing_link):
1866 os.remove(self.missing_link)
1867
1868 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001869 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001870 self.assertTrue(os.path.exists(self.dirlink))
1871 self.assertTrue(os.path.isdir(self.dirlink))
1872 self.assertTrue(os.path.islink(self.dirlink))
1873 self.check_stat(self.dirlink, self.dirlink_target)
1874
1875 def test_file_link(self):
1876 os.symlink(self.filelink_target, self.filelink)
1877 self.assertTrue(os.path.exists(self.filelink))
1878 self.assertTrue(os.path.isfile(self.filelink))
1879 self.assertTrue(os.path.islink(self.filelink))
1880 self.check_stat(self.filelink, self.filelink_target)
1881
1882 def _create_missing_dir_link(self):
1883 'Create a "directory" link to a non-existent target'
1884 linkname = self.missing_link
1885 if os.path.lexists(linkname):
1886 os.remove(linkname)
1887 target = r'c:\\target does not exist.29r3c740'
1888 assert not os.path.exists(target)
1889 target_is_dir = True
1890 os.symlink(target, linkname, target_is_dir)
1891
1892 def test_remove_directory_link_to_missing_target(self):
1893 self._create_missing_dir_link()
1894 # For compatibility with Unix, os.remove will check the
1895 # directory status and call RemoveDirectory if the symlink
1896 # was created with target_is_dir==True.
1897 os.remove(self.missing_link)
1898
1899 @unittest.skip("currently fails; consider for improvement")
1900 def test_isdir_on_directory_link_to_missing_target(self):
1901 self._create_missing_dir_link()
1902 # consider having isdir return true for directory links
1903 self.assertTrue(os.path.isdir(self.missing_link))
1904
1905 @unittest.skip("currently fails; consider for improvement")
1906 def test_rmdir_on_directory_link_to_missing_target(self):
1907 self._create_missing_dir_link()
1908 # consider allowing rmdir to remove directory links
1909 os.rmdir(self.missing_link)
1910
1911 def check_stat(self, link, target):
1912 self.assertEqual(os.stat(link), os.stat(target))
1913 self.assertNotEqual(os.lstat(link), os.stat(link))
1914
Brian Curtind25aef52011-06-13 15:16:04 -05001915 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001916 with warnings.catch_warnings():
1917 warnings.simplefilter("ignore", DeprecationWarning)
1918 self.assertEqual(os.stat(bytes_link), os.stat(target))
1919 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001920
1921 def test_12084(self):
1922 level1 = os.path.abspath(support.TESTFN)
1923 level2 = os.path.join(level1, "level2")
1924 level3 = os.path.join(level2, "level3")
1925 try:
1926 os.mkdir(level1)
1927 os.mkdir(level2)
1928 os.mkdir(level3)
1929
1930 file1 = os.path.abspath(os.path.join(level1, "file1"))
1931
1932 with open(file1, "w") as f:
1933 f.write("file1")
1934
1935 orig_dir = os.getcwd()
1936 try:
1937 os.chdir(level2)
1938 link = os.path.join(level2, "link")
1939 os.symlink(os.path.relpath(file1), "link")
1940 self.assertIn("link", os.listdir(os.getcwd()))
1941
1942 # Check os.stat calls from the same dir as the link
1943 self.assertEqual(os.stat(file1), os.stat("link"))
1944
1945 # Check os.stat calls from a dir below the link
1946 os.chdir(level1)
1947 self.assertEqual(os.stat(file1),
1948 os.stat(os.path.relpath(link)))
1949
1950 # Check os.stat calls from a dir above the link
1951 os.chdir(level3)
1952 self.assertEqual(os.stat(file1),
1953 os.stat(os.path.relpath(link)))
1954 finally:
1955 os.chdir(orig_dir)
1956 except OSError as err:
1957 self.fail(err)
1958 finally:
1959 os.remove(file1)
1960 shutil.rmtree(level1)
1961
Brian Curtind40e6f72010-07-08 21:39:08 +00001962
Tim Golden0321cf22014-05-05 19:46:17 +01001963@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1964class Win32JunctionTests(unittest.TestCase):
1965 junction = 'junctiontest'
1966 junction_target = os.path.dirname(os.path.abspath(__file__))
1967
1968 def setUp(self):
1969 assert os.path.exists(self.junction_target)
1970 assert not os.path.exists(self.junction)
1971
1972 def tearDown(self):
1973 if os.path.exists(self.junction):
1974 # os.rmdir delegates to Windows' RemoveDirectoryW,
1975 # which removes junction points safely.
1976 os.rmdir(self.junction)
1977
1978 def test_create_junction(self):
1979 _winapi.CreateJunction(self.junction_target, self.junction)
1980 self.assertTrue(os.path.exists(self.junction))
1981 self.assertTrue(os.path.isdir(self.junction))
1982
1983 # Junctions are not recognized as links.
1984 self.assertFalse(os.path.islink(self.junction))
1985
1986 def test_unlink_removes_junction(self):
1987 _winapi.CreateJunction(self.junction_target, self.junction)
1988 self.assertTrue(os.path.exists(self.junction))
1989
1990 os.unlink(self.junction)
1991 self.assertFalse(os.path.exists(self.junction))
1992
1993
Jason R. Coombs3a092862013-05-27 23:21:28 -04001994@support.skip_unless_symlink
1995class NonLocalSymlinkTests(unittest.TestCase):
1996
1997 def setUp(self):
1998 """
1999 Create this structure:
2000
2001 base
2002 \___ some_dir
2003 """
2004 os.makedirs('base/some_dir')
2005
2006 def tearDown(self):
2007 shutil.rmtree('base')
2008
2009 def test_directory_link_nonlocal(self):
2010 """
2011 The symlink target should resolve relative to the link, not relative
2012 to the current directory.
2013
2014 Then, link base/some_link -> base/some_dir and ensure that some_link
2015 is resolved as a directory.
2016
2017 In issue13772, it was discovered that directory detection failed if
2018 the symlink target was not specified relative to the current
2019 directory, which was a defect in the implementation.
2020 """
2021 src = os.path.join('base', 'some_link')
2022 os.symlink('some_dir', src)
2023 assert os.path.isdir(src)
2024
2025
Victor Stinnere8d51452010-08-19 01:05:19 +00002026class FSEncodingTests(unittest.TestCase):
2027 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002028 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2029 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002030
Victor Stinnere8d51452010-08-19 01:05:19 +00002031 def test_identity(self):
2032 # assert fsdecode(fsencode(x)) == x
2033 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2034 try:
2035 bytesfn = os.fsencode(fn)
2036 except UnicodeEncodeError:
2037 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002038 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002039
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002040
Brett Cannonefb00c02012-02-29 18:31:31 -05002041
2042class DeviceEncodingTests(unittest.TestCase):
2043
2044 def test_bad_fd(self):
2045 # Return None when an fd doesn't actually exist.
2046 self.assertIsNone(os.device_encoding(123456))
2047
Philip Jenveye308b7c2012-02-29 16:16:15 -08002048 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2049 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002050 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002051 def test_device_encoding(self):
2052 encoding = os.device_encoding(0)
2053 self.assertIsNotNone(encoding)
2054 self.assertTrue(codecs.lookup(encoding))
2055
2056
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002057class PidTests(unittest.TestCase):
2058 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2059 def test_getppid(self):
2060 p = subprocess.Popen([sys.executable, '-c',
2061 'import os; print(os.getppid())'],
2062 stdout=subprocess.PIPE)
2063 stdout, _ = p.communicate()
2064 # We are the parent of our subprocess
2065 self.assertEqual(int(stdout), os.getpid())
2066
2067
Brian Curtin0151b8e2010-09-24 13:43:43 +00002068# The introduction of this TestCase caused at least two different errors on
2069# *nix buildbots. Temporarily skip this to let the buildbots move along.
2070@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002071@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2072class LoginTests(unittest.TestCase):
2073 def test_getlogin(self):
2074 user_name = os.getlogin()
2075 self.assertNotEqual(len(user_name), 0)
2076
2077
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002078@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2079 "needs os.getpriority and os.setpriority")
2080class ProgramPriorityTests(unittest.TestCase):
2081 """Tests for os.getpriority() and os.setpriority()."""
2082
2083 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002084
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002085 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2086 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2087 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002088 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2089 if base >= 19 and new_prio <= 19:
2090 raise unittest.SkipTest(
2091 "unable to reliably test setpriority at current nice level of %s" % base)
2092 else:
2093 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002094 finally:
2095 try:
2096 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2097 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002098 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002099 raise
2100
2101
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002102if threading is not None:
2103 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002104
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002105 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002106
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002107 def __init__(self, conn):
2108 asynchat.async_chat.__init__(self, conn)
2109 self.in_buffer = []
2110 self.closed = False
2111 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002112
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002113 def handle_read(self):
2114 data = self.recv(4096)
2115 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002116
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002117 def get_data(self):
2118 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002119
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002120 def handle_close(self):
2121 self.close()
2122 self.closed = True
2123
2124 def handle_error(self):
2125 raise
2126
2127 def __init__(self, address):
2128 threading.Thread.__init__(self)
2129 asyncore.dispatcher.__init__(self)
2130 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2131 self.bind(address)
2132 self.listen(5)
2133 self.host, self.port = self.socket.getsockname()[:2]
2134 self.handler_instance = None
2135 self._active = False
2136 self._active_lock = threading.Lock()
2137
2138 # --- public API
2139
2140 @property
2141 def running(self):
2142 return self._active
2143
2144 def start(self):
2145 assert not self.running
2146 self.__flag = threading.Event()
2147 threading.Thread.start(self)
2148 self.__flag.wait()
2149
2150 def stop(self):
2151 assert self.running
2152 self._active = False
2153 self.join()
2154
2155 def wait(self):
2156 # wait for handler connection to be closed, then stop the server
2157 while not getattr(self.handler_instance, "closed", False):
2158 time.sleep(0.001)
2159 self.stop()
2160
2161 # --- internals
2162
2163 def run(self):
2164 self._active = True
2165 self.__flag.set()
2166 while self._active and asyncore.socket_map:
2167 self._active_lock.acquire()
2168 asyncore.loop(timeout=0.001, count=1)
2169 self._active_lock.release()
2170 asyncore.close_all()
2171
2172 def handle_accept(self):
2173 conn, addr = self.accept()
2174 self.handler_instance = self.Handler(conn)
2175
2176 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002177 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002178 handle_read = handle_connect
2179
2180 def writable(self):
2181 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002182
2183 def handle_error(self):
2184 raise
2185
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002186
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002187@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002188@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2189class TestSendfile(unittest.TestCase):
2190
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002191 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002192 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002193 not sys.platform.startswith("solaris") and \
2194 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002195 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2196 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002197
2198 @classmethod
2199 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002200 cls.key = support.threading_setup()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002201 with open(support.TESTFN, "wb") as f:
2202 f.write(cls.DATA)
2203
2204 @classmethod
2205 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002206 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002207 support.unlink(support.TESTFN)
2208
2209 def setUp(self):
2210 self.server = SendfileTestServer((support.HOST, 0))
2211 self.server.start()
2212 self.client = socket.socket()
2213 self.client.connect((self.server.host, self.server.port))
2214 self.client.settimeout(1)
2215 # synchronize by waiting for "220 ready" response
2216 self.client.recv(1024)
2217 self.sockno = self.client.fileno()
2218 self.file = open(support.TESTFN, 'rb')
2219 self.fileno = self.file.fileno()
2220
2221 def tearDown(self):
2222 self.file.close()
2223 self.client.close()
2224 if self.server.running:
2225 self.server.stop()
2226
2227 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2228 """A higher level wrapper representing how an application is
2229 supposed to use sendfile().
2230 """
2231 while 1:
2232 try:
2233 if self.SUPPORT_HEADERS_TRAILERS:
2234 return os.sendfile(sock, file, offset, nbytes, headers,
2235 trailers)
2236 else:
2237 return os.sendfile(sock, file, offset, nbytes)
2238 except OSError as err:
2239 if err.errno == errno.ECONNRESET:
2240 # disconnected
2241 raise
2242 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2243 # we have to retry send data
2244 continue
2245 else:
2246 raise
2247
2248 def test_send_whole_file(self):
2249 # normal send
2250 total_sent = 0
2251 offset = 0
2252 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002253 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002254 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2255 if sent == 0:
2256 break
2257 offset += sent
2258 total_sent += sent
2259 self.assertTrue(sent <= nbytes)
2260 self.assertEqual(offset, total_sent)
2261
2262 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002263 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002264 self.client.close()
2265 self.server.wait()
2266 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002267 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002268 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002269
2270 def test_send_at_certain_offset(self):
2271 # start sending a file at a certain offset
2272 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002273 offset = len(self.DATA) // 2
2274 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002275 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002276 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002277 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2278 if sent == 0:
2279 break
2280 offset += sent
2281 total_sent += sent
2282 self.assertTrue(sent <= nbytes)
2283
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002284 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002285 self.client.close()
2286 self.server.wait()
2287 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002288 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002289 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002290 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002291 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002292
2293 def test_offset_overflow(self):
2294 # specify an offset > file size
2295 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002296 try:
2297 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2298 except OSError as e:
2299 # Solaris can raise EINVAL if offset >= file length, ignore.
2300 if e.errno != errno.EINVAL:
2301 raise
2302 else:
2303 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002304 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002305 self.client.close()
2306 self.server.wait()
2307 data = self.server.handler_instance.get_data()
2308 self.assertEqual(data, b'')
2309
2310 def test_invalid_offset(self):
2311 with self.assertRaises(OSError) as cm:
2312 os.sendfile(self.sockno, self.fileno, -1, 4096)
2313 self.assertEqual(cm.exception.errno, errno.EINVAL)
2314
2315 # --- headers / trailers tests
2316
Serhiy Storchaka43767632013-11-03 21:31:38 +02002317 @requires_headers_trailers
2318 def test_headers(self):
2319 total_sent = 0
2320 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2321 headers=[b"x" * 512])
2322 total_sent += sent
2323 offset = 4096
2324 nbytes = 4096
2325 while 1:
2326 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2327 offset, nbytes)
2328 if sent == 0:
2329 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002330 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002331 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002332
Serhiy Storchaka43767632013-11-03 21:31:38 +02002333 expected_data = b"x" * 512 + self.DATA
2334 self.assertEqual(total_sent, len(expected_data))
2335 self.client.close()
2336 self.server.wait()
2337 data = self.server.handler_instance.get_data()
2338 self.assertEqual(hash(data), hash(expected_data))
2339
2340 @requires_headers_trailers
2341 def test_trailers(self):
2342 TESTFN2 = support.TESTFN + "2"
2343 file_data = b"abcdef"
2344 with open(TESTFN2, 'wb') as f:
2345 f.write(file_data)
2346 with open(TESTFN2, 'rb')as f:
2347 self.addCleanup(os.remove, TESTFN2)
2348 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2349 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002350 self.client.close()
2351 self.server.wait()
2352 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002353 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002354
Serhiy Storchaka43767632013-11-03 21:31:38 +02002355 @requires_headers_trailers
2356 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2357 'test needs os.SF_NODISKIO')
2358 def test_flags(self):
2359 try:
2360 os.sendfile(self.sockno, self.fileno, 0, 4096,
2361 flags=os.SF_NODISKIO)
2362 except OSError as err:
2363 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2364 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002365
2366
Larry Hastings9cf065c2012-06-22 16:30:09 -07002367def supports_extended_attributes():
2368 if not hasattr(os, "setxattr"):
2369 return False
2370 try:
2371 with open(support.TESTFN, "wb") as fp:
2372 try:
2373 os.setxattr(fp.fileno(), b"user.test", b"")
2374 except OSError:
2375 return False
2376 finally:
2377 support.unlink(support.TESTFN)
2378 # Kernels < 2.6.39 don't respect setxattr flags.
2379 kernel_version = platform.release()
2380 m = re.match("2.6.(\d{1,2})", kernel_version)
2381 return m is None or int(m.group(1)) >= 39
2382
2383
2384@unittest.skipUnless(supports_extended_attributes(),
2385 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002386class ExtendedAttributeTests(unittest.TestCase):
2387
2388 def tearDown(self):
2389 support.unlink(support.TESTFN)
2390
Larry Hastings9cf065c2012-06-22 16:30:09 -07002391 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002392 fn = support.TESTFN
2393 open(fn, "wb").close()
2394 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002395 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002396 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002397 init_xattr = listxattr(fn)
2398 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002399 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002400 xattr = set(init_xattr)
2401 xattr.add("user.test")
2402 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002403 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2404 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2405 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002406 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002407 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002408 self.assertEqual(cm.exception.errno, errno.EEXIST)
2409 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002410 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002411 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002412 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002413 xattr.add("user.test2")
2414 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002415 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002416 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002417 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002418 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002419 xattr.remove("user.test")
2420 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002421 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2422 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2423 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2424 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002425 many = sorted("user.test{}".format(i) for i in range(100))
2426 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002427 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002428 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002429
Larry Hastings9cf065c2012-06-22 16:30:09 -07002430 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002431 def make_bytes(s):
2432 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002433 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002434 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002435 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002436
2437 def test_simple(self):
2438 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2439 os.listxattr)
2440
2441 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002442 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2443 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002444
2445 def test_fds(self):
2446 def getxattr(path, *args):
2447 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002448 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002449 def setxattr(path, *args):
2450 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002451 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002452 def removexattr(path, *args):
2453 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002454 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002455 def listxattr(path, *args):
2456 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002457 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002458 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2459
2460
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002461@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2462class Win32DeprecatedBytesAPI(unittest.TestCase):
2463 def test_deprecated(self):
2464 import nt
2465 filename = os.fsencode(support.TESTFN)
2466 with warnings.catch_warnings():
2467 warnings.simplefilter("error", DeprecationWarning)
2468 for func, *args in (
2469 (nt._getfullpathname, filename),
2470 (nt._isdir, filename),
2471 (os.access, filename, os.R_OK),
2472 (os.chdir, filename),
2473 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002474 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002475 (os.link, filename, filename),
2476 (os.listdir, filename),
2477 (os.lstat, filename),
2478 (os.mkdir, filename),
2479 (os.open, filename, os.O_RDONLY),
2480 (os.rename, filename, filename),
2481 (os.rmdir, filename),
2482 (os.startfile, filename),
2483 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002484 (os.unlink, filename),
2485 (os.utime, filename),
2486 ):
2487 self.assertRaises(DeprecationWarning, func, *args)
2488
Victor Stinner28216442011-11-16 00:34:44 +01002489 @support.skip_unless_symlink
2490 def test_symlink(self):
2491 filename = os.fsencode(support.TESTFN)
2492 with warnings.catch_warnings():
2493 warnings.simplefilter("error", DeprecationWarning)
2494 self.assertRaises(DeprecationWarning,
2495 os.symlink, filename, filename)
2496
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002497
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002498@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2499class TermsizeTests(unittest.TestCase):
2500 def test_does_not_crash(self):
2501 """Check if get_terminal_size() returns a meaningful value.
2502
2503 There's no easy portable way to actually check the size of the
2504 terminal, so let's check if it returns something sensible instead.
2505 """
2506 try:
2507 size = os.get_terminal_size()
2508 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002509 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002510 # Under win32 a generic OSError can be thrown if the
2511 # handle cannot be retrieved
2512 self.skipTest("failed to query terminal size")
2513 raise
2514
Antoine Pitroucfade362012-02-08 23:48:59 +01002515 self.assertGreaterEqual(size.columns, 0)
2516 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002517
2518 def test_stty_match(self):
2519 """Check if stty returns the same results
2520
2521 stty actually tests stdin, so get_terminal_size is invoked on
2522 stdin explicitly. If stty succeeded, then get_terminal_size()
2523 should work too.
2524 """
2525 try:
2526 size = subprocess.check_output(['stty', 'size']).decode().split()
2527 except (FileNotFoundError, subprocess.CalledProcessError):
2528 self.skipTest("stty invocation failed")
2529 expected = (int(size[1]), int(size[0])) # reversed order
2530
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002531 try:
2532 actual = os.get_terminal_size(sys.__stdin__.fileno())
2533 except OSError as e:
2534 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2535 # Under win32 a generic OSError can be thrown if the
2536 # handle cannot be retrieved
2537 self.skipTest("failed to query terminal size")
2538 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002539 self.assertEqual(expected, actual)
2540
2541
Victor Stinner292c8352012-10-30 02:17:38 +01002542class OSErrorTests(unittest.TestCase):
2543 def setUp(self):
2544 class Str(str):
2545 pass
2546
Victor Stinnerafe17062012-10-31 22:47:43 +01002547 self.bytes_filenames = []
2548 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002549 if support.TESTFN_UNENCODABLE is not None:
2550 decoded = support.TESTFN_UNENCODABLE
2551 else:
2552 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002553 self.unicode_filenames.append(decoded)
2554 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002555 if support.TESTFN_UNDECODABLE is not None:
2556 encoded = support.TESTFN_UNDECODABLE
2557 else:
2558 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002559 self.bytes_filenames.append(encoded)
2560 self.bytes_filenames.append(memoryview(encoded))
2561
2562 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002563
2564 def test_oserror_filename(self):
2565 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002566 (self.filenames, os.chdir,),
2567 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002568 (self.filenames, os.lstat,),
2569 (self.filenames, os.open, os.O_RDONLY),
2570 (self.filenames, os.rmdir,),
2571 (self.filenames, os.stat,),
2572 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002573 ]
2574 if sys.platform == "win32":
2575 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002576 (self.bytes_filenames, os.rename, b"dst"),
2577 (self.bytes_filenames, os.replace, b"dst"),
2578 (self.unicode_filenames, os.rename, "dst"),
2579 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002580 # Issue #16414: Don't test undecodable names with listdir()
2581 # because of a Windows bug.
2582 #
2583 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2584 # empty list (instead of failing), whereas os.listdir(b'\xff')
2585 # raises a FileNotFoundError. It looks like a Windows bug:
2586 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2587 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2588 # ERROR_PATH_NOT_FOUND (3).
2589 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002590 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002591 else:
2592 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002593 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002594 (self.filenames, os.rename, "dst"),
2595 (self.filenames, os.replace, "dst"),
2596 ))
2597 if hasattr(os, "chown"):
2598 funcs.append((self.filenames, os.chown, 0, 0))
2599 if hasattr(os, "lchown"):
2600 funcs.append((self.filenames, os.lchown, 0, 0))
2601 if hasattr(os, "truncate"):
2602 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002603 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002604 funcs.append((self.filenames, os.chflags, 0))
2605 if hasattr(os, "lchflags"):
2606 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002607 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002608 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002609 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002610 if sys.platform == "win32":
2611 funcs.append((self.bytes_filenames, os.link, b"dst"))
2612 funcs.append((self.unicode_filenames, os.link, "dst"))
2613 else:
2614 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002615 if hasattr(os, "listxattr"):
2616 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002617 (self.filenames, os.listxattr,),
2618 (self.filenames, os.getxattr, "user.test"),
2619 (self.filenames, os.setxattr, "user.test", b'user'),
2620 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002621 ))
2622 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002623 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002624 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002625 if sys.platform == "win32":
2626 funcs.append((self.unicode_filenames, os.readlink,))
2627 else:
2628 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002629
Victor Stinnerafe17062012-10-31 22:47:43 +01002630 for filenames, func, *func_args in funcs:
2631 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002632 try:
2633 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002634 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002635 self.assertIs(err.filename, name)
2636 else:
2637 self.fail("No exception thrown by {}".format(func))
2638
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002639class CPUCountTests(unittest.TestCase):
2640 def test_cpu_count(self):
2641 cpus = os.cpu_count()
2642 if cpus is not None:
2643 self.assertIsInstance(cpus, int)
2644 self.assertGreater(cpus, 0)
2645 else:
2646 self.skipTest("Could not determine the number of CPUs")
2647
Victor Stinnerdaf45552013-08-28 00:53:59 +02002648
2649class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002650 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002651 fd = os.open(__file__, os.O_RDONLY)
2652 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002653 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002654
Victor Stinnerdaf45552013-08-28 00:53:59 +02002655 os.set_inheritable(fd, True)
2656 self.assertEqual(os.get_inheritable(fd), True)
2657
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002658 @unittest.skipIf(fcntl is None, "need fcntl")
2659 def test_get_inheritable_cloexec(self):
2660 fd = os.open(__file__, os.O_RDONLY)
2661 self.addCleanup(os.close, fd)
2662 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002663
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002664 # clear FD_CLOEXEC flag
2665 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2666 flags &= ~fcntl.FD_CLOEXEC
2667 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002668
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002669 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002670
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002671 @unittest.skipIf(fcntl is None, "need fcntl")
2672 def test_set_inheritable_cloexec(self):
2673 fd = os.open(__file__, os.O_RDONLY)
2674 self.addCleanup(os.close, fd)
2675 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2676 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002677
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002678 os.set_inheritable(fd, True)
2679 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2680 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002681
Victor Stinnerdaf45552013-08-28 00:53:59 +02002682 def test_open(self):
2683 fd = os.open(__file__, os.O_RDONLY)
2684 self.addCleanup(os.close, fd)
2685 self.assertEqual(os.get_inheritable(fd), False)
2686
2687 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2688 def test_pipe(self):
2689 rfd, wfd = os.pipe()
2690 self.addCleanup(os.close, rfd)
2691 self.addCleanup(os.close, wfd)
2692 self.assertEqual(os.get_inheritable(rfd), False)
2693 self.assertEqual(os.get_inheritable(wfd), False)
2694
2695 def test_dup(self):
2696 fd1 = os.open(__file__, os.O_RDONLY)
2697 self.addCleanup(os.close, fd1)
2698
2699 fd2 = os.dup(fd1)
2700 self.addCleanup(os.close, fd2)
2701 self.assertEqual(os.get_inheritable(fd2), False)
2702
2703 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2704 def test_dup2(self):
2705 fd = os.open(__file__, os.O_RDONLY)
2706 self.addCleanup(os.close, fd)
2707
2708 # inheritable by default
2709 fd2 = os.open(__file__, os.O_RDONLY)
2710 try:
2711 os.dup2(fd, fd2)
2712 self.assertEqual(os.get_inheritable(fd2), True)
2713 finally:
2714 os.close(fd2)
2715
2716 # force non-inheritable
2717 fd3 = os.open(__file__, os.O_RDONLY)
2718 try:
2719 os.dup2(fd, fd3, inheritable=False)
2720 self.assertEqual(os.get_inheritable(fd3), False)
2721 finally:
2722 os.close(fd3)
2723
2724 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2725 def test_openpty(self):
2726 master_fd, slave_fd = os.openpty()
2727 self.addCleanup(os.close, master_fd)
2728 self.addCleanup(os.close, slave_fd)
2729 self.assertEqual(os.get_inheritable(master_fd), False)
2730 self.assertEqual(os.get_inheritable(slave_fd), False)
2731
2732
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002733@unittest.skipUnless(hasattr(os, 'get_blocking'),
2734 'needs os.get_blocking() and os.set_blocking()')
2735class BlockingTests(unittest.TestCase):
2736 def test_blocking(self):
2737 fd = os.open(__file__, os.O_RDONLY)
2738 self.addCleanup(os.close, fd)
2739 self.assertEqual(os.get_blocking(fd), True)
2740
2741 os.set_blocking(fd, False)
2742 self.assertEqual(os.get_blocking(fd), False)
2743
2744 os.set_blocking(fd, True)
2745 self.assertEqual(os.get_blocking(fd), True)
2746
2747
Yury Selivanov97e2e062014-09-26 12:33:06 -04002748
2749class ExportsTests(unittest.TestCase):
2750 def test_os_all(self):
2751 self.assertIn('open', os.__all__)
2752 self.assertIn('walk', os.__all__)
2753
2754
Victor Stinner6036e442015-03-08 01:58:04 +01002755class TestScandir(unittest.TestCase):
2756 def setUp(self):
2757 self.path = os.path.realpath(support.TESTFN)
2758 self.addCleanup(support.rmtree, self.path)
2759 os.mkdir(self.path)
2760
2761 def create_file(self, name="file.txt"):
2762 filename = os.path.join(self.path, name)
2763 with open(filename, "wb") as fp:
2764 fp.write(b'python')
2765 return filename
2766
2767 def get_entries(self, names):
2768 entries = dict((entry.name, entry)
2769 for entry in os.scandir(self.path))
2770 self.assertEqual(sorted(entries.keys()), names)
2771 return entries
2772
2773 def assert_stat_equal(self, stat1, stat2, skip_fields):
2774 if skip_fields:
2775 for attr in dir(stat1):
2776 if not attr.startswith("st_"):
2777 continue
2778 if attr in ("st_dev", "st_ino", "st_nlink"):
2779 continue
2780 self.assertEqual(getattr(stat1, attr),
2781 getattr(stat2, attr),
2782 (stat1, stat2, attr))
2783 else:
2784 self.assertEqual(stat1, stat2)
2785
2786 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2787 self.assertEqual(entry.name, name)
2788 self.assertEqual(entry.path, os.path.join(self.path, name))
2789 self.assertEqual(entry.inode(),
2790 os.stat(entry.path, follow_symlinks=False).st_ino)
2791
2792 entry_stat = os.stat(entry.path)
2793 self.assertEqual(entry.is_dir(),
2794 stat.S_ISDIR(entry_stat.st_mode))
2795 self.assertEqual(entry.is_file(),
2796 stat.S_ISREG(entry_stat.st_mode))
2797 self.assertEqual(entry.is_symlink(),
2798 os.path.islink(entry.path))
2799
2800 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2801 self.assertEqual(entry.is_dir(follow_symlinks=False),
2802 stat.S_ISDIR(entry_lstat.st_mode))
2803 self.assertEqual(entry.is_file(follow_symlinks=False),
2804 stat.S_ISREG(entry_lstat.st_mode))
2805
2806 self.assert_stat_equal(entry.stat(),
2807 entry_stat,
2808 os.name == 'nt' and not is_symlink)
2809 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2810 entry_lstat,
2811 os.name == 'nt')
2812
2813 def test_attributes(self):
2814 link = hasattr(os, 'link')
2815 symlink = support.can_symlink()
2816
2817 dirname = os.path.join(self.path, "dir")
2818 os.mkdir(dirname)
2819 filename = self.create_file("file.txt")
2820 if link:
2821 os.link(filename, os.path.join(self.path, "link_file.txt"))
2822 if symlink:
2823 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2824 target_is_directory=True)
2825 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2826
2827 names = ['dir', 'file.txt']
2828 if link:
2829 names.append('link_file.txt')
2830 if symlink:
2831 names.extend(('symlink_dir', 'symlink_file.txt'))
2832 entries = self.get_entries(names)
2833
2834 entry = entries['dir']
2835 self.check_entry(entry, 'dir', True, False, False)
2836
2837 entry = entries['file.txt']
2838 self.check_entry(entry, 'file.txt', False, True, False)
2839
2840 if link:
2841 entry = entries['link_file.txt']
2842 self.check_entry(entry, 'link_file.txt', False, True, False)
2843
2844 if symlink:
2845 entry = entries['symlink_dir']
2846 self.check_entry(entry, 'symlink_dir', True, False, True)
2847
2848 entry = entries['symlink_file.txt']
2849 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2850
2851 def get_entry(self, name):
2852 entries = list(os.scandir(self.path))
2853 self.assertEqual(len(entries), 1)
2854
2855 entry = entries[0]
2856 self.assertEqual(entry.name, name)
2857 return entry
2858
2859 def create_file_entry(self):
2860 filename = self.create_file()
2861 return self.get_entry(os.path.basename(filename))
2862
2863 def test_current_directory(self):
2864 filename = self.create_file()
2865 old_dir = os.getcwd()
2866 try:
2867 os.chdir(self.path)
2868
2869 # call scandir() without parameter: it must list the content
2870 # of the current directory
2871 entries = dict((entry.name, entry) for entry in os.scandir())
2872 self.assertEqual(sorted(entries.keys()),
2873 [os.path.basename(filename)])
2874 finally:
2875 os.chdir(old_dir)
2876
2877 def test_repr(self):
2878 entry = self.create_file_entry()
2879 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2880
2881 def test_removed_dir(self):
2882 path = os.path.join(self.path, 'dir')
2883
2884 os.mkdir(path)
2885 entry = self.get_entry('dir')
2886 os.rmdir(path)
2887
2888 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2889 if os.name == 'nt':
2890 self.assertTrue(entry.is_dir())
2891 self.assertFalse(entry.is_file())
2892 self.assertFalse(entry.is_symlink())
2893 if os.name == 'nt':
2894 self.assertRaises(FileNotFoundError, entry.inode)
2895 # don't fail
2896 entry.stat()
2897 entry.stat(follow_symlinks=False)
2898 else:
2899 self.assertGreater(entry.inode(), 0)
2900 self.assertRaises(FileNotFoundError, entry.stat)
2901 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2902
2903 def test_removed_file(self):
2904 entry = self.create_file_entry()
2905 os.unlink(entry.path)
2906
2907 self.assertFalse(entry.is_dir())
2908 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2909 if os.name == 'nt':
2910 self.assertTrue(entry.is_file())
2911 self.assertFalse(entry.is_symlink())
2912 if os.name == 'nt':
2913 self.assertRaises(FileNotFoundError, entry.inode)
2914 # don't fail
2915 entry.stat()
2916 entry.stat(follow_symlinks=False)
2917 else:
2918 self.assertGreater(entry.inode(), 0)
2919 self.assertRaises(FileNotFoundError, entry.stat)
2920 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2921
2922 def test_broken_symlink(self):
2923 if not support.can_symlink():
2924 return self.skipTest('cannot create symbolic link')
2925
2926 filename = self.create_file("file.txt")
2927 os.symlink(filename,
2928 os.path.join(self.path, "symlink.txt"))
2929 entries = self.get_entries(['file.txt', 'symlink.txt'])
2930 entry = entries['symlink.txt']
2931 os.unlink(filename)
2932
2933 self.assertGreater(entry.inode(), 0)
2934 self.assertFalse(entry.is_dir())
2935 self.assertFalse(entry.is_file()) # broken symlink returns False
2936 self.assertFalse(entry.is_dir(follow_symlinks=False))
2937 self.assertFalse(entry.is_file(follow_symlinks=False))
2938 self.assertTrue(entry.is_symlink())
2939 self.assertRaises(FileNotFoundError, entry.stat)
2940 # don't fail
2941 entry.stat(follow_symlinks=False)
2942
2943 def test_bytes(self):
2944 if os.name == "nt":
2945 # On Windows, os.scandir(bytes) must raise an exception
2946 self.assertRaises(TypeError, os.scandir, b'.')
2947 return
2948
2949 self.create_file("file.txt")
2950
2951 path_bytes = os.fsencode(self.path)
2952 entries = list(os.scandir(path_bytes))
2953 self.assertEqual(len(entries), 1, entries)
2954 entry = entries[0]
2955
2956 self.assertEqual(entry.name, b'file.txt')
2957 self.assertEqual(entry.path,
2958 os.fsencode(os.path.join(self.path, 'file.txt')))
2959
2960 def test_empty_path(self):
2961 self.assertRaises(FileNotFoundError, os.scandir, '')
2962
2963 def test_consume_iterator_twice(self):
2964 self.create_file("file.txt")
2965 iterator = os.scandir(self.path)
2966
2967 entries = list(iterator)
2968 self.assertEqual(len(entries), 1, entries)
2969
2970 # check than consuming the iterator twice doesn't raise exception
2971 entries2 = list(iterator)
2972 self.assertEqual(len(entries2), 0, entries2)
2973
2974 def test_bad_path_type(self):
2975 for obj in [1234, 1.234, {}, []]:
2976 self.assertRaises(TypeError, os.scandir, obj)
2977
2978
Fred Drake2e2be372001-09-20 21:33:42 +00002979if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05002980 unittest.main()