blob: b540fcbd4d73a774cd048781a8a220062def0a1e [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Paul Monson62dfd7d2019-04-25 11:36:45 -070031from platform import win32_is_iot
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032
Antoine Pitrouec34ab52013-08-16 20:44:38 +020033try:
34 import resource
35except ImportError:
36 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020037try:
38 import fcntl
39except ImportError:
40 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010041try:
42 import _winapi
43except ImportError:
44 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020045try:
R David Murrayf2ad1732014-12-25 18:36:56 -050046 import grp
47 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
48 if hasattr(os, 'getgid'):
49 process_gid = os.getgid()
50 if process_gid not in groups:
51 groups.append(process_gid)
52except ImportError:
53 groups = []
54try:
55 import pwd
56 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010057except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050058 all_users = []
59try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020060 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020061except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020063
Berker Peksagce643912015-05-06 06:33:17 +030064from test.support.script_helper import assert_python_ok
Serhiy Storchakab21d1552018-03-02 11:53:51 +020065from test.support import unix_shell, FakePath
Fred Drake38c2ef02001-07-17 20:52:51 +000066
Victor Stinner923590e2016-03-24 09:11:48 +010067
R David Murrayf2ad1732014-12-25 18:36:56 -050068root_in_posix = False
69if hasattr(os, 'geteuid'):
70 root_in_posix = (os.geteuid() == 0)
71
Mark Dickinson7cf03892010-04-16 13:45:35 +000072# Detect whether we're on a Linux system that uses the (now outdated
73# and unmaintained) linuxthreads threading library. There's an issue
74# when combining linuxthreads with a failed execv call: see
75# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020076if hasattr(sys, 'thread_info') and sys.thread_info.version:
77 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
78else:
79 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000080
Stefan Krahebee49a2013-01-17 15:31:00 +010081# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
82HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
83
Victor Stinner923590e2016-03-24 09:11:48 +010084
Berker Peksag4af23d72016-09-15 20:32:44 +030085def requires_os_func(name):
86 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
87
88
Victor Stinnerae39d232016-03-24 17:12:55 +010089def create_file(filename, content=b'content'):
90 with open(filename, "xb", 0) as fp:
91 fp.write(content)
92
93
Thomas Wouters0e3f5912006-08-11 14:57:12 +000094# Tests creating TESTFN
95class FileTests(unittest.TestCase):
96 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +000097 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000098 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000099 tearDown = setUp
100
101 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000102 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000103 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000104 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000105
Christian Heimesfdab48e2008-01-20 09:06:41 +0000106 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000107 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
108 # We must allocate two consecutive file descriptors, otherwise
109 # it will mess up other file descriptors (perhaps even the three
110 # standard ones).
111 second = os.dup(first)
112 try:
113 retries = 0
114 while second != first + 1:
115 os.close(first)
116 retries += 1
117 if retries > 10:
118 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000119 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000120 first, second = second, os.dup(second)
121 finally:
122 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000123 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000124 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000125 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000126
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000127 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000128 def test_rename(self):
129 path = support.TESTFN
130 old = sys.getrefcount(path)
131 self.assertRaises(TypeError, os.rename, path, 0)
132 new = sys.getrefcount(path)
133 self.assertEqual(old, new)
134
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000135 def test_read(self):
136 with open(support.TESTFN, "w+b") as fobj:
137 fobj.write(b"spam")
138 fobj.flush()
139 fd = fobj.fileno()
140 os.lseek(fd, 0, 0)
141 s = os.read(fd, 4)
142 self.assertEqual(type(s), bytes)
143 self.assertEqual(s, b"spam")
144
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200145 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200146 # Skip the test on 32-bit platforms: the number of bytes must fit in a
147 # Py_ssize_t type
148 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
149 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200150 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
151 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200152 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100153 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200154
155 # Issue #21932: Make sure that os.read() does not raise an
156 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200157 with open(support.TESTFN, "rb") as fp:
158 data = os.read(fp.fileno(), size)
159
Victor Stinner8c663fd2017-11-08 14:44:44 -0800160 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200161 # operating system is free to return less bytes than requested.
162 self.assertEqual(data, b'test')
163
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000164 def test_write(self):
165 # os.write() accepts bytes- and buffer-like objects but not strings
166 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
167 self.assertRaises(TypeError, os.write, fd, "beans")
168 os.write(fd, b"bacon\n")
169 os.write(fd, bytearray(b"eggs\n"))
170 os.write(fd, memoryview(b"spam\n"))
171 os.close(fd)
172 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000173 self.assertEqual(fobj.read().splitlines(),
174 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000175
Victor Stinnere0daff12011-03-20 23:36:35 +0100176 def write_windows_console(self, *args):
177 retcode = subprocess.call(args,
178 # use a new console to not flood the test output
179 creationflags=subprocess.CREATE_NEW_CONSOLE,
180 # use a shell to hide the console window (SW_HIDE)
181 shell=True)
182 self.assertEqual(retcode, 0)
183
184 @unittest.skipUnless(sys.platform == 'win32',
185 'test specific to the Windows console')
186 def test_write_windows_console(self):
187 # Issue #11395: the Windows console returns an error (12: not enough
188 # space error) on writing into stdout if stdout mode is binary and the
189 # length is greater than 66,000 bytes (or less, depending on heap
190 # usage).
191 code = "print('x' * 100000)"
192 self.write_windows_console(sys.executable, "-c", code)
193 self.write_windows_console(sys.executable, "-u", "-c", code)
194
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000195 def fdopen_helper(self, *args):
196 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200197 f = os.fdopen(fd, *args)
198 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000199
200 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200201 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
202 os.close(fd)
203
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000204 self.fdopen_helper()
205 self.fdopen_helper('r')
206 self.fdopen_helper('r', 100)
207
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100208 def test_replace(self):
209 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100210 self.addCleanup(support.unlink, support.TESTFN)
211 self.addCleanup(support.unlink, TESTFN2)
212
213 create_file(support.TESTFN, b"1")
214 create_file(TESTFN2, b"2")
215
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100216 os.replace(support.TESTFN, TESTFN2)
217 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
218 with open(TESTFN2, 'r') as f:
219 self.assertEqual(f.read(), "1")
220
Martin Panterbf19d162015-09-09 01:01:13 +0000221 def test_open_keywords(self):
222 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
223 dir_fd=None)
224 os.close(f)
225
226 def test_symlink_keywords(self):
227 symlink = support.get_attribute(os, "symlink")
228 try:
229 symlink(src='target', dst=support.TESTFN,
230 target_is_directory=False, dir_fd=None)
231 except (NotImplementedError, OSError):
232 pass # No OS support or unprivileged user
233
Pablo Galindoaac4d032019-05-31 19:39:47 +0100234 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
235 def test_copy_file_range_invalid_values(self):
236 with self.assertRaises(ValueError):
237 os.copy_file_range(0, 1, -10)
238
239 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
240 def test_copy_file_range(self):
241 TESTFN2 = support.TESTFN + ".3"
242 data = b'0123456789'
243
244 create_file(support.TESTFN, data)
245 self.addCleanup(support.unlink, support.TESTFN)
246
247 in_file = open(support.TESTFN, 'rb')
248 self.addCleanup(in_file.close)
249 in_fd = in_file.fileno()
250
251 out_file = open(TESTFN2, 'w+b')
252 self.addCleanup(support.unlink, TESTFN2)
253 self.addCleanup(out_file.close)
254 out_fd = out_file.fileno()
255
256 try:
257 i = os.copy_file_range(in_fd, out_fd, 5)
258 except OSError as e:
259 # Handle the case in which Python was compiled
260 # in a system with the syscall but without support
261 # in the kernel.
262 if e.errno != errno.ENOSYS:
263 raise
264 self.skipTest(e)
265 else:
266 # The number of copied bytes can be less than
267 # the number of bytes originally requested.
268 self.assertIn(i, range(0, 6));
269
270 with open(TESTFN2, 'rb') as in_file:
271 self.assertEqual(in_file.read(), data[:i])
272
273 @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()')
274 def test_copy_file_range_offset(self):
275 TESTFN4 = support.TESTFN + ".4"
276 data = b'0123456789'
277 bytes_to_copy = 6
278 in_skip = 3
279 out_seek = 5
280
281 create_file(support.TESTFN, data)
282 self.addCleanup(support.unlink, support.TESTFN)
283
284 in_file = open(support.TESTFN, 'rb')
285 self.addCleanup(in_file.close)
286 in_fd = in_file.fileno()
287
288 out_file = open(TESTFN4, 'w+b')
289 self.addCleanup(support.unlink, TESTFN4)
290 self.addCleanup(out_file.close)
291 out_fd = out_file.fileno()
292
293 try:
294 i = os.copy_file_range(in_fd, out_fd, bytes_to_copy,
295 offset_src=in_skip,
296 offset_dst=out_seek)
297 except OSError as e:
298 # Handle the case in which Python was compiled
299 # in a system with the syscall but without support
300 # in the kernel.
301 if e.errno != errno.ENOSYS:
302 raise
303 self.skipTest(e)
304 else:
305 # The number of copied bytes can be less than
306 # the number of bytes originally requested.
307 self.assertIn(i, range(0, bytes_to_copy+1));
308
309 with open(TESTFN4, 'rb') as in_file:
310 read = in_file.read()
311 # seeked bytes (5) are zero'ed
312 self.assertEqual(read[:out_seek], b'\x00'*out_seek)
313 # 012 are skipped (in_skip)
314 # 345678 are copied in the file (in_skip + bytes_to_copy)
315 self.assertEqual(read[out_seek:],
316 data[in_skip:in_skip+i])
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200317
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000318# Test attributes on return values from os.*stat* family.
319class StatAttributeTests(unittest.TestCase):
320 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200321 self.fname = support.TESTFN
322 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100323 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000324
Antoine Pitrou38425292010-09-21 18:19:07 +0000325 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000326 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000327
328 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000329 self.assertEqual(result[stat.ST_SIZE], 3)
330 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000331
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000332 # Make sure all the attributes are there
333 members = dir(result)
334 for name in dir(stat):
335 if name[:3] == 'ST_':
336 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000337 if name.endswith("TIME"):
338 def trunc(x): return int(x)
339 else:
340 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000341 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000342 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000343 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344
Larry Hastings6fe20b32012-04-19 15:07:49 -0700345 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700346 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700347 for name in 'st_atime st_mtime st_ctime'.split():
348 floaty = int(getattr(result, name) * 100000)
349 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700350 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700351
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000352 try:
353 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200354 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000355 except IndexError:
356 pass
357
358 # Make sure that assignment fails
359 try:
360 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200361 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000362 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000363 pass
364
365 try:
366 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200367 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000368 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000369 pass
370
371 try:
372 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200373 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000374 except AttributeError:
375 pass
376
377 # Use the stat_result constructor with a too-short tuple.
378 try:
379 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200380 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000381 except TypeError:
382 pass
383
Ezio Melotti42da6632011-03-15 05:18:48 +0200384 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000385 try:
386 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
387 except TypeError:
388 pass
389
Antoine Pitrou38425292010-09-21 18:19:07 +0000390 def test_stat_attributes(self):
391 self.check_stat_attributes(self.fname)
392
393 def test_stat_attributes_bytes(self):
394 try:
395 fname = self.fname.encode(sys.getfilesystemencoding())
396 except UnicodeEncodeError:
397 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700398 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000399
Christian Heimes25827622013-10-12 01:27:08 +0200400 def test_stat_result_pickle(self):
401 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200402 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
403 p = pickle.dumps(result, proto)
404 self.assertIn(b'stat_result', p)
405 if proto < 4:
406 self.assertIn(b'cos\nstat_result\n', p)
407 unpickled = pickle.loads(p)
408 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200409
Serhiy Storchaka43767632013-11-03 21:31:38 +0200410 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000411 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700412 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000413
414 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000415 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000416
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000417 # Make sure all the attributes are there.
418 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
419 'ffree', 'favail', 'flag', 'namemax')
420 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000421 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000422
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100423 self.assertTrue(isinstance(result.f_fsid, int))
424
425 # Test that the size of the tuple doesn't change
426 self.assertEqual(len(result), 10)
427
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000428 # Make sure that assignment really fails
429 try:
430 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200431 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000432 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000433 pass
434
435 try:
436 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200437 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000438 except AttributeError:
439 pass
440
441 # Use the constructor with a too-short tuple.
442 try:
443 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200444 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000445 except TypeError:
446 pass
447
Ezio Melotti42da6632011-03-15 05:18:48 +0200448 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000449 try:
450 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
451 except TypeError:
452 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000453
Christian Heimes25827622013-10-12 01:27:08 +0200454 @unittest.skipUnless(hasattr(os, 'statvfs'),
455 "need os.statvfs()")
456 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700457 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200458
Serhiy Storchakabad12572014-12-15 14:03:42 +0200459 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
460 p = pickle.dumps(result, proto)
461 self.assertIn(b'statvfs_result', p)
462 if proto < 4:
463 self.assertIn(b'cos\nstatvfs_result\n', p)
464 unpickled = pickle.loads(p)
465 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200466
Serhiy Storchaka43767632013-11-03 21:31:38 +0200467 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
468 def test_1686475(self):
469 # Verify that an open file can be stat'ed
470 try:
471 os.stat(r"c:\pagefile.sys")
472 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600473 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200474 except OSError as e:
475 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000476
Serhiy Storchaka43767632013-11-03 21:31:38 +0200477 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
478 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
479 def test_15261(self):
480 # Verify that stat'ing a closed fd does not cause crash
481 r, w = os.pipe()
482 try:
483 os.stat(r) # should not raise error
484 finally:
485 os.close(r)
486 os.close(w)
487 with self.assertRaises(OSError) as ctx:
488 os.stat(r)
489 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100490
Zachary Ware63f277b2014-06-19 09:46:37 -0500491 def check_file_attributes(self, result):
492 self.assertTrue(hasattr(result, 'st_file_attributes'))
493 self.assertTrue(isinstance(result.st_file_attributes, int))
494 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
495
496 @unittest.skipUnless(sys.platform == "win32",
497 "st_file_attributes is Win32 specific")
498 def test_file_attributes(self):
499 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
500 result = os.stat(self.fname)
501 self.check_file_attributes(result)
502 self.assertEqual(
503 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
504 0)
505
506 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200507 dirname = support.TESTFN + "dir"
508 os.mkdir(dirname)
509 self.addCleanup(os.rmdir, dirname)
510
511 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500512 self.check_file_attributes(result)
513 self.assertEqual(
514 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
515 stat.FILE_ATTRIBUTE_DIRECTORY)
516
Berker Peksag0b4dc482016-09-17 15:49:59 +0300517 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
518 def test_access_denied(self):
519 # Default to FindFirstFile WIN32_FIND_DATA when access is
520 # denied. See issue 28075.
521 # os.environ['TEMP'] should be located on a volume that
522 # supports file ACLs.
523 fname = os.path.join(os.environ['TEMP'], self.fname)
524 self.addCleanup(support.unlink, fname)
525 create_file(fname, b'ABC')
526 # Deny the right to [S]YNCHRONIZE on the file to
527 # force CreateFile to fail with ERROR_ACCESS_DENIED.
528 DETACHED_PROCESS = 8
529 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500530 # bpo-30584: Use security identifier *S-1-5-32-545 instead
531 # of localized "Users" to not depend on the locale.
532 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300533 creationflags=DETACHED_PROCESS
534 )
535 result = os.stat(fname)
536 self.assertNotEqual(result.st_size, 0)
537
Victor Stinner47aacc82015-06-12 17:26:23 +0200538
539class UtimeTests(unittest.TestCase):
540 def setUp(self):
541 self.dirname = support.TESTFN
542 self.fname = os.path.join(self.dirname, "f1")
543
544 self.addCleanup(support.rmtree, self.dirname)
545 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100546 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200547
Victor Stinner47aacc82015-06-12 17:26:23 +0200548 def support_subsecond(self, filename):
549 # Heuristic to check if the filesystem supports timestamp with
550 # subsecond resolution: check if float and int timestamps are different
551 st = os.stat(filename)
552 return ((st.st_atime != st[7])
553 or (st.st_mtime != st[8])
554 or (st.st_ctime != st[9]))
555
556 def _test_utime(self, set_time, filename=None):
557 if not filename:
558 filename = self.fname
559
560 support_subsecond = self.support_subsecond(filename)
561 if support_subsecond:
562 # Timestamp with a resolution of 1 microsecond (10^-6).
563 #
564 # The resolution of the C internal function used by os.utime()
565 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
566 # test with a resolution of 1 ns requires more work:
567 # see the issue #15745.
568 atime_ns = 1002003000 # 1.002003 seconds
569 mtime_ns = 4005006000 # 4.005006 seconds
570 else:
571 # use a resolution of 1 second
572 atime_ns = 5 * 10**9
573 mtime_ns = 8 * 10**9
574
575 set_time(filename, (atime_ns, mtime_ns))
576 st = os.stat(filename)
577
578 if support_subsecond:
579 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
580 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
581 else:
582 self.assertEqual(st.st_atime, atime_ns * 1e-9)
583 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
584 self.assertEqual(st.st_atime_ns, atime_ns)
585 self.assertEqual(st.st_mtime_ns, mtime_ns)
586
587 def test_utime(self):
588 def set_time(filename, ns):
589 # test the ns keyword parameter
590 os.utime(filename, ns=ns)
591 self._test_utime(set_time)
592
593 @staticmethod
594 def ns_to_sec(ns):
595 # Convert a number of nanosecond (int) to a number of seconds (float).
596 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
597 # issue, os.utime() rounds towards minus infinity.
598 return (ns * 1e-9) + 0.5e-9
599
600 def test_utime_by_indexed(self):
601 # pass times as floating point seconds as the second indexed parameter
602 def set_time(filename, ns):
603 atime_ns, mtime_ns = ns
604 atime = self.ns_to_sec(atime_ns)
605 mtime = self.ns_to_sec(mtime_ns)
606 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
607 # or utime(time_t)
608 os.utime(filename, (atime, mtime))
609 self._test_utime(set_time)
610
611 def test_utime_by_times(self):
612 def set_time(filename, ns):
613 atime_ns, mtime_ns = ns
614 atime = self.ns_to_sec(atime_ns)
615 mtime = self.ns_to_sec(mtime_ns)
616 # test the times keyword parameter
617 os.utime(filename, times=(atime, mtime))
618 self._test_utime(set_time)
619
620 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
621 "follow_symlinks support for utime required "
622 "for this test.")
623 def test_utime_nofollow_symlinks(self):
624 def set_time(filename, ns):
625 # use follow_symlinks=False to test utimensat(timespec)
626 # or lutimes(timeval)
627 os.utime(filename, ns=ns, follow_symlinks=False)
628 self._test_utime(set_time)
629
630 @unittest.skipUnless(os.utime in os.supports_fd,
631 "fd support for utime required for this test.")
632 def test_utime_fd(self):
633 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100634 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200635 # use a file descriptor to test futimens(timespec)
636 # or futimes(timeval)
637 os.utime(fp.fileno(), ns=ns)
638 self._test_utime(set_time)
639
640 @unittest.skipUnless(os.utime in os.supports_dir_fd,
641 "dir_fd support for utime required for this test.")
642 def test_utime_dir_fd(self):
643 def set_time(filename, ns):
644 dirname, name = os.path.split(filename)
645 dirfd = os.open(dirname, os.O_RDONLY)
646 try:
647 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
648 os.utime(name, dir_fd=dirfd, ns=ns)
649 finally:
650 os.close(dirfd)
651 self._test_utime(set_time)
652
653 def test_utime_directory(self):
654 def set_time(filename, ns):
655 # test calling os.utime() on a directory
656 os.utime(filename, ns=ns)
657 self._test_utime(set_time, filename=self.dirname)
658
659 def _test_utime_current(self, set_time):
660 # Get the system clock
661 current = time.time()
662
663 # Call os.utime() to set the timestamp to the current system clock
664 set_time(self.fname)
665
666 if not self.support_subsecond(self.fname):
667 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700668 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200669 # On Windows, the usual resolution of time.time() is 15.6 ms.
670 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700671 #
672 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
673 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200674 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200675 st = os.stat(self.fname)
676 msg = ("st_time=%r, current=%r, dt=%r"
677 % (st.st_mtime, current, st.st_mtime - current))
678 self.assertAlmostEqual(st.st_mtime, current,
679 delta=delta, msg=msg)
680
681 def test_utime_current(self):
682 def set_time(filename):
683 # Set to the current time in the new way
684 os.utime(self.fname)
685 self._test_utime_current(set_time)
686
687 def test_utime_current_old(self):
688 def set_time(filename):
689 # Set to the current time in the old explicit way.
690 os.utime(self.fname, None)
691 self._test_utime_current(set_time)
692
693 def get_file_system(self, path):
694 if sys.platform == 'win32':
695 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
696 import ctypes
697 kernel32 = ctypes.windll.kernel32
698 buf = ctypes.create_unicode_buffer("", 100)
699 ok = kernel32.GetVolumeInformationW(root, None, 0,
700 None, None, None,
701 buf, len(buf))
702 if ok:
703 return buf.value
704 # return None if the filesystem is unknown
705
706 def test_large_time(self):
707 # Many filesystems are limited to the year 2038. At least, the test
708 # pass with NTFS filesystem.
709 if self.get_file_system(self.dirname) != "NTFS":
710 self.skipTest("requires NTFS")
711
712 large = 5000000000 # some day in 2128
713 os.utime(self.fname, (large, large))
714 self.assertEqual(os.stat(self.fname).st_mtime, large)
715
716 def test_utime_invalid_arguments(self):
717 # seconds and nanoseconds parameters are mutually exclusive
718 with self.assertRaises(ValueError):
719 os.utime(self.fname, (5, 5), ns=(5, 5))
Serhiy Storchaka32bc11c2018-12-01 14:30:20 +0200720 with self.assertRaises(TypeError):
721 os.utime(self.fname, [5, 5])
722 with self.assertRaises(TypeError):
723 os.utime(self.fname, (5,))
724 with self.assertRaises(TypeError):
725 os.utime(self.fname, (5, 5, 5))
726 with self.assertRaises(TypeError):
727 os.utime(self.fname, ns=[5, 5])
728 with self.assertRaises(TypeError):
729 os.utime(self.fname, ns=(5,))
730 with self.assertRaises(TypeError):
731 os.utime(self.fname, ns=(5, 5, 5))
732
733 if os.utime not in os.supports_follow_symlinks:
734 with self.assertRaises(NotImplementedError):
735 os.utime(self.fname, (5, 5), follow_symlinks=False)
736 if os.utime not in os.supports_fd:
737 with open(self.fname, 'wb', 0) as fp:
738 with self.assertRaises(TypeError):
739 os.utime(fp.fileno(), (5, 5))
740 if os.utime not in os.supports_dir_fd:
741 with self.assertRaises(NotImplementedError):
742 os.utime(self.fname, (5, 5), dir_fd=0)
Victor Stinner47aacc82015-06-12 17:26:23 +0200743
Oren Milman0bd1a2d2018-09-12 22:14:35 +0300744 @support.cpython_only
745 def test_issue31577(self):
746 # The interpreter shouldn't crash in case utime() received a bad
747 # ns argument.
748 def get_bad_int(divmod_ret_val):
749 class BadInt:
750 def __divmod__(*args):
751 return divmod_ret_val
752 return BadInt()
753 with self.assertRaises(TypeError):
754 os.utime(self.fname, ns=(get_bad_int(42), 1))
755 with self.assertRaises(TypeError):
756 os.utime(self.fname, ns=(get_bad_int(()), 1))
757 with self.assertRaises(TypeError):
758 os.utime(self.fname, ns=(get_bad_int((1, 2, 3)), 1))
759
Victor Stinner47aacc82015-06-12 17:26:23 +0200760
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000761from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000762
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000763class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000764 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000765 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000766
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000767 def setUp(self):
768 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000769 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000770 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000771 for key, value in self._reference().items():
772 os.environ[key] = value
773
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000774 def tearDown(self):
775 os.environ.clear()
776 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000777 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000778 os.environb.clear()
779 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000780
Christian Heimes90333392007-11-01 19:08:42 +0000781 def _reference(self):
782 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
783
784 def _empty_mapping(self):
785 os.environ.clear()
786 return os.environ
787
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000788 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200789 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
790 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000791 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000792 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300793 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200794 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300795 value = popen.read().strip()
796 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000797
Xavier de Gayed1415312016-07-22 12:15:29 +0200798 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
799 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000800 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200801 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
802 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300803 it = iter(popen)
804 self.assertEqual(next(it), "line1\n")
805 self.assertEqual(next(it), "line2\n")
806 self.assertEqual(next(it), "line3\n")
807 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000808
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000809 # Verify environ keys and values from the OS are of the
810 # correct str type.
811 def test_keyvalue_types(self):
812 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000813 self.assertEqual(type(key), str)
814 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000815
Christian Heimes90333392007-11-01 19:08:42 +0000816 def test_items(self):
817 for key, value in self._reference().items():
818 self.assertEqual(os.environ.get(key), value)
819
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000820 # Issue 7310
821 def test___repr__(self):
822 """Check that the repr() of os.environ looks like environ({...})."""
823 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000824 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
825 '{!r}: {!r}'.format(key, value)
826 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000827
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000828 def test_get_exec_path(self):
829 defpath_list = os.defpath.split(os.pathsep)
830 test_path = ['/monty', '/python', '', '/flying/circus']
831 test_env = {'PATH': os.pathsep.join(test_path)}
832
833 saved_environ = os.environ
834 try:
835 os.environ = dict(test_env)
836 # Test that defaulting to os.environ works.
837 self.assertSequenceEqual(test_path, os.get_exec_path())
838 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
839 finally:
840 os.environ = saved_environ
841
842 # No PATH environment variable
843 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
844 # Empty PATH environment variable
845 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
846 # Supplied PATH environment variable
847 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
848
Victor Stinnerb745a742010-05-18 17:17:23 +0000849 if os.supports_bytes_environ:
850 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000851 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000852 # ignore BytesWarning warning
853 with warnings.catch_warnings(record=True):
854 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000855 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000856 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000857 pass
858 else:
859 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000860
861 # bytes key and/or value
862 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
863 ['abc'])
864 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
865 ['abc'])
866 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
867 ['abc'])
868
869 @unittest.skipUnless(os.supports_bytes_environ,
870 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000871 def test_environb(self):
872 # os.environ -> os.environb
873 value = 'euro\u20ac'
874 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000875 value_bytes = value.encode(sys.getfilesystemencoding(),
876 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000877 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000878 msg = "U+20AC character is not encodable to %s" % (
879 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000880 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000881 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000882 self.assertEqual(os.environ['unicode'], value)
883 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000884
885 # os.environb -> os.environ
886 value = b'\xff'
887 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000888 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000889 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000890 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000891
Victor Stinner13ff2452018-01-22 18:32:50 +0100892 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100893 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100894 def test_unset_error(self):
895 if sys.platform == "win32":
896 # an environment variable is limited to 32,767 characters
897 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100898 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100899 else:
900 # "=" is not allowed in a variable name
901 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100902 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100903
Victor Stinner6d101392013-04-14 16:35:04 +0200904 def test_key_type(self):
905 missing = 'missingkey'
906 self.assertNotIn(missing, os.environ)
907
Victor Stinner839e5ea2013-04-14 16:43:03 +0200908 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200909 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200910 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200911 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200912
Victor Stinner839e5ea2013-04-14 16:43:03 +0200913 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200914 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200915 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200916 self.assertTrue(cm.exception.__suppress_context__)
917
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300918 def _test_environ_iteration(self, collection):
919 iterator = iter(collection)
920 new_key = "__new_key__"
921
922 next(iterator) # start iteration over os.environ.items
923
924 # add a new key in os.environ mapping
925 os.environ[new_key] = "test_environ_iteration"
926
927 try:
928 next(iterator) # force iteration over modified mapping
929 self.assertEqual(os.environ[new_key], "test_environ_iteration")
930 finally:
931 del os.environ[new_key]
932
933 def test_iter_error_when_changing_os_environ(self):
934 self._test_environ_iteration(os.environ)
935
936 def test_iter_error_when_changing_os_environ_items(self):
937 self._test_environ_iteration(os.environ.items())
938
939 def test_iter_error_when_changing_os_environ_values(self):
940 self._test_environ_iteration(os.environ.values())
941
Victor Stinner6d101392013-04-14 16:35:04 +0200942
Tim Petersc4e09402003-04-25 07:11:48 +0000943class WalkTests(unittest.TestCase):
944 """Tests for os.walk()."""
945
Victor Stinner0561c532015-03-12 10:28:24 +0100946 # Wrapper to hide minor differences between os.walk and os.fwalk
947 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200948 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200949 if 'follow_symlinks' in kwargs:
950 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200951 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100952
Charles-François Natali7372b062012-02-05 15:15:38 +0100953 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100954 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100955 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000956
957 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000958 # TESTFN/
959 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000960 # tmp1
961 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000962 # tmp2
963 # SUB11/ no kids
964 # SUB2/ a file kid and a dirsymlink kid
965 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300966 # SUB21/ not readable
967 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000968 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200969 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300970 # broken_link2
971 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000972 # TEST2/
973 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100974 self.walk_path = join(support.TESTFN, "TEST1")
975 self.sub1_path = join(self.walk_path, "SUB1")
976 self.sub11_path = join(self.sub1_path, "SUB11")
977 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300978 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100979 tmp1_path = join(self.walk_path, "tmp1")
980 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000981 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300982 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100983 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000984 t2_path = join(support.TESTFN, "TEST2")
985 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200986 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300987 broken_link2_path = join(sub2_path, "broken_link2")
988 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000989
990 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100991 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000992 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300993 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000994 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100995
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300996 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100997 with open(path, "x") as f:
998 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000999
Victor Stinner0561c532015-03-12 10:28:24 +01001000 if support.can_symlink():
1001 os.symlink(os.path.abspath(t2_path), self.link_path)
1002 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001003 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
1004 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001005 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001006 ["broken_link", "broken_link2", "broken_link3",
1007 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001008 else:
pxinwr3e028b22019-02-15 13:04:47 +08001009 self.sub2_tree = (sub2_path, ["SUB21"], ["tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +01001010
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +03001011 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001012 try:
1013 os.listdir(sub21_path)
1014 except PermissionError:
1015 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
1016 else:
1017 os.chmod(sub21_path, stat.S_IRWXU)
1018 os.unlink(tmp5_path)
1019 os.rmdir(sub21_path)
1020 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +03001021
Victor Stinner0561c532015-03-12 10:28:24 +01001022 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001023 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +03001024 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +01001025
Tim Petersc4e09402003-04-25 07:11:48 +00001026 self.assertEqual(len(all), 4)
1027 # We can't know which order SUB1 and SUB2 will appear in.
1028 # Not flipped: TESTFN, SUB1, SUB11, SUB2
1029 # flipped: TESTFN, SUB2, SUB1, SUB11
1030 flipped = all[0][1][0] != "SUB1"
1031 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +02001032 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001033 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001034 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1035 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
1036 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
1037 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001038
Brett Cannon3f9183b2016-08-26 14:44:48 -07001039 def test_walk_prune(self, walk_path=None):
1040 if walk_path is None:
1041 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +00001042 # Prune the search.
1043 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -07001044 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +00001045 all.append((root, dirs, files))
1046 # Don't descend into SUB1.
1047 if 'SUB1' in dirs:
1048 # Note that this also mutates the dirs we appended to all!
1049 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +00001050
Victor Stinner0561c532015-03-12 10:28:24 +01001051 self.assertEqual(len(all), 2)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001052 self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +01001053
1054 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001055 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001056 self.assertEqual(all[1], self.sub2_tree)
1057
Brett Cannon3f9183b2016-08-26 14:44:48 -07001058 def test_file_like_path(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001059 self.test_walk_prune(FakePath(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -07001060
Victor Stinner0561c532015-03-12 10:28:24 +01001061 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +00001062 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +01001063 all = list(self.walk(self.walk_path, topdown=False))
1064
Victor Stinner53b0a412016-03-26 01:12:36 +01001065 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +00001066 # We can't know which order SUB1 and SUB2 will appear in.
1067 # Not flipped: SUB11, SUB1, SUB2, TESTFN
1068 # flipped: SUB2, SUB11, SUB1, TESTFN
1069 flipped = all[3][1][0] != "SUB1"
1070 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +02001071 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +03001072 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +01001073 self.assertEqual(all[3],
1074 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
1075 self.assertEqual(all[flipped],
1076 (self.sub11_path, [], []))
1077 self.assertEqual(all[flipped + 1],
1078 (self.sub1_path, ["SUB11"], ["tmp2"]))
1079 self.assertEqual(all[2 - 2 * flipped],
1080 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001081
Victor Stinner0561c532015-03-12 10:28:24 +01001082 def test_walk_symlink(self):
1083 if not support.can_symlink():
1084 self.skipTest("need symlink support")
1085
1086 # Walk, following symlinks.
1087 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1088 for root, dirs, files in walk_it:
1089 if root == self.link_path:
1090 self.assertEqual(dirs, [])
1091 self.assertEqual(files, ["tmp4"])
1092 break
1093 else:
1094 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001095
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001096 def test_walk_bad_dir(self):
1097 # Walk top-down.
1098 errors = []
1099 walk_it = self.walk(self.walk_path, onerror=errors.append)
1100 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001101 self.assertEqual(errors, [])
1102 dir1 = 'SUB1'
1103 path1 = os.path.join(root, dir1)
1104 path1new = os.path.join(root, dir1 + '.new')
1105 os.rename(path1, path1new)
1106 try:
1107 roots = [r for r, d, f in walk_it]
1108 self.assertTrue(errors)
1109 self.assertNotIn(path1, roots)
1110 self.assertNotIn(path1new, roots)
1111 for dir2 in dirs:
1112 if dir2 != dir1:
1113 self.assertIn(os.path.join(root, dir2), roots)
1114 finally:
1115 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001116
Charles-François Natali7372b062012-02-05 15:15:38 +01001117
1118@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1119class FwalkTests(WalkTests):
1120 """Tests for os.fwalk()."""
1121
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001122 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001123 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001124 yield (root, dirs, files)
1125
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001126 def fwalk(self, *args, **kwargs):
1127 return os.fwalk(*args, **kwargs)
1128
Larry Hastingsc48fe982012-06-25 04:49:05 -07001129 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1130 """
1131 compare with walk() results.
1132 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001133 walk_kwargs = walk_kwargs.copy()
1134 fwalk_kwargs = fwalk_kwargs.copy()
1135 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1136 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1137 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001138
Charles-François Natali7372b062012-02-05 15:15:38 +01001139 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001140 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001141 expected[root] = (set(dirs), set(files))
1142
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001143 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001144 self.assertIn(root, expected)
1145 self.assertEqual(expected[root], (set(dirs), set(files)))
1146
Larry Hastingsc48fe982012-06-25 04:49:05 -07001147 def test_compare_to_walk(self):
1148 kwargs = {'top': support.TESTFN}
1149 self._compare_to_walk(kwargs, kwargs)
1150
Charles-François Natali7372b062012-02-05 15:15:38 +01001151 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001152 try:
1153 fd = os.open(".", os.O_RDONLY)
1154 walk_kwargs = {'top': support.TESTFN}
1155 fwalk_kwargs = walk_kwargs.copy()
1156 fwalk_kwargs['dir_fd'] = fd
1157 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1158 finally:
1159 os.close(fd)
1160
1161 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001162 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001163 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1164 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001165 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001166 # check that the FD is valid
1167 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001168 # redundant check
1169 os.stat(rootfd)
1170 # check that listdir() returns consistent information
1171 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001172
1173 def test_fd_leak(self):
1174 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1175 # we both check that calling fwalk() a large number of times doesn't
1176 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1177 minfd = os.dup(1)
1178 os.close(minfd)
1179 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001180 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001181 pass
1182 newfd = os.dup(1)
1183 self.addCleanup(os.close, newfd)
1184 self.assertEqual(newfd, minfd)
1185
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001186class BytesWalkTests(WalkTests):
1187 """Tests for os.walk() with bytes."""
1188 def walk(self, top, **kwargs):
1189 if 'follow_symlinks' in kwargs:
1190 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1191 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1192 root = os.fsdecode(broot)
1193 dirs = list(map(os.fsdecode, bdirs))
1194 files = list(map(os.fsdecode, bfiles))
1195 yield (root, dirs, files)
1196 bdirs[:] = list(map(os.fsencode, dirs))
1197 bfiles[:] = list(map(os.fsencode, files))
1198
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001199@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1200class BytesFwalkTests(FwalkTests):
1201 """Tests for os.walk() with bytes."""
1202 def fwalk(self, top='.', *args, **kwargs):
1203 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1204 root = os.fsdecode(broot)
1205 dirs = list(map(os.fsdecode, bdirs))
1206 files = list(map(os.fsdecode, bfiles))
1207 yield (root, dirs, files, topfd)
1208 bdirs[:] = list(map(os.fsencode, dirs))
1209 bfiles[:] = list(map(os.fsencode, files))
1210
Charles-François Natali7372b062012-02-05 15:15:38 +01001211
Guido van Rossume7ba4952007-06-06 23:52:48 +00001212class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001213 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001214 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001215
1216 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001217 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001218 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1219 os.makedirs(path) # Should work
1220 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1221 os.makedirs(path)
1222
1223 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001224 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001225 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1226 os.makedirs(path)
1227 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1228 'dir5', 'dir6')
1229 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001230
Serhiy Storchakae304e332017-03-24 13:27:42 +02001231 def test_mode(self):
1232 with support.temp_umask(0o002):
1233 base = support.TESTFN
1234 parent = os.path.join(base, 'dir1')
1235 path = os.path.join(parent, 'dir2')
1236 os.makedirs(path, 0o555)
1237 self.assertTrue(os.path.exists(path))
1238 self.assertTrue(os.path.isdir(path))
1239 if os.name != 'nt':
Benjamin Peterson84db4a92018-09-13 12:00:14 -07001240 self.assertEqual(os.stat(path).st_mode & 0o777, 0o555)
1241 self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775)
Serhiy Storchakae304e332017-03-24 13:27:42 +02001242
Terry Reedy5a22b652010-12-02 07:05:56 +00001243 def test_exist_ok_existing_directory(self):
1244 path = os.path.join(support.TESTFN, 'dir1')
1245 mode = 0o777
1246 old_mask = os.umask(0o022)
1247 os.makedirs(path, mode)
1248 self.assertRaises(OSError, os.makedirs, path, mode)
1249 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001250 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001251 os.makedirs(path, mode=mode, exist_ok=True)
1252 os.umask(old_mask)
1253
Martin Pantera82642f2015-11-19 04:48:44 +00001254 # Issue #25583: A drive root could raise PermissionError on Windows
1255 os.makedirs(os.path.abspath('/'), exist_ok=True)
1256
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001257 def test_exist_ok_s_isgid_directory(self):
1258 path = os.path.join(support.TESTFN, 'dir1')
1259 S_ISGID = stat.S_ISGID
1260 mode = 0o777
1261 old_mask = os.umask(0o022)
1262 try:
1263 existing_testfn_mode = stat.S_IMODE(
1264 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001265 try:
1266 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001267 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001268 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001269 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1270 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1271 # The os should apply S_ISGID from the parent dir for us, but
1272 # this test need not depend on that behavior. Be explicit.
1273 os.makedirs(path, mode | S_ISGID)
1274 # http://bugs.python.org/issue14992
1275 # Should not fail when the bit is already set.
1276 os.makedirs(path, mode, exist_ok=True)
1277 # remove the bit.
1278 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001279 # May work even when the bit is not already set when demanded.
1280 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001281 finally:
1282 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001283
1284 def test_exist_ok_existing_regular_file(self):
1285 base = support.TESTFN
1286 path = os.path.join(support.TESTFN, 'dir1')
Serhiy Storchaka5b10b982019-03-05 10:06:26 +02001287 with open(path, 'w') as f:
1288 f.write('abc')
Terry Reedy5a22b652010-12-02 07:05:56 +00001289 self.assertRaises(OSError, os.makedirs, path)
1290 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1291 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1292 os.remove(path)
1293
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001294 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001295 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001296 'dir4', 'dir5', 'dir6')
1297 # If the tests failed, the bottom-most directory ('../dir6')
1298 # may not have been created, so we look for the outermost directory
1299 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001300 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001301 path = os.path.dirname(path)
1302
1303 os.removedirs(path)
1304
Andrew Svetlov405faed2012-12-25 12:18:09 +02001305
R David Murrayf2ad1732014-12-25 18:36:56 -05001306@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1307class ChownFileTests(unittest.TestCase):
1308
Berker Peksag036a71b2015-07-21 09:29:48 +03001309 @classmethod
1310 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001311 os.mkdir(support.TESTFN)
1312
1313 def test_chown_uid_gid_arguments_must_be_index(self):
1314 stat = os.stat(support.TESTFN)
1315 uid = stat.st_uid
1316 gid = stat.st_gid
1317 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1318 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1319 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1320 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1321 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1322
1323 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1324 def test_chown(self):
1325 gid_1, gid_2 = groups[:2]
1326 uid = os.stat(support.TESTFN).st_uid
1327 os.chown(support.TESTFN, uid, gid_1)
1328 gid = os.stat(support.TESTFN).st_gid
1329 self.assertEqual(gid, gid_1)
1330 os.chown(support.TESTFN, uid, gid_2)
1331 gid = os.stat(support.TESTFN).st_gid
1332 self.assertEqual(gid, gid_2)
1333
1334 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1335 "test needs root privilege and more than one user")
1336 def test_chown_with_root(self):
1337 uid_1, uid_2 = all_users[:2]
1338 gid = os.stat(support.TESTFN).st_gid
1339 os.chown(support.TESTFN, uid_1, gid)
1340 uid = os.stat(support.TESTFN).st_uid
1341 self.assertEqual(uid, uid_1)
1342 os.chown(support.TESTFN, uid_2, gid)
1343 uid = os.stat(support.TESTFN).st_uid
1344 self.assertEqual(uid, uid_2)
1345
1346 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1347 "test needs non-root account and more than one user")
1348 def test_chown_without_permission(self):
1349 uid_1, uid_2 = all_users[:2]
1350 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001351 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001352 os.chown(support.TESTFN, uid_1, gid)
1353 os.chown(support.TESTFN, uid_2, gid)
1354
Berker Peksag036a71b2015-07-21 09:29:48 +03001355 @classmethod
1356 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001357 os.rmdir(support.TESTFN)
1358
1359
Andrew Svetlov405faed2012-12-25 12:18:09 +02001360class RemoveDirsTests(unittest.TestCase):
1361 def setUp(self):
1362 os.makedirs(support.TESTFN)
1363
1364 def tearDown(self):
1365 support.rmtree(support.TESTFN)
1366
1367 def test_remove_all(self):
1368 dira = os.path.join(support.TESTFN, 'dira')
1369 os.mkdir(dira)
1370 dirb = os.path.join(dira, 'dirb')
1371 os.mkdir(dirb)
1372 os.removedirs(dirb)
1373 self.assertFalse(os.path.exists(dirb))
1374 self.assertFalse(os.path.exists(dira))
1375 self.assertFalse(os.path.exists(support.TESTFN))
1376
1377 def test_remove_partial(self):
1378 dira = os.path.join(support.TESTFN, 'dira')
1379 os.mkdir(dira)
1380 dirb = os.path.join(dira, 'dirb')
1381 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001382 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001383 os.removedirs(dirb)
1384 self.assertFalse(os.path.exists(dirb))
1385 self.assertTrue(os.path.exists(dira))
1386 self.assertTrue(os.path.exists(support.TESTFN))
1387
1388 def test_remove_nothing(self):
1389 dira = os.path.join(support.TESTFN, 'dira')
1390 os.mkdir(dira)
1391 dirb = os.path.join(dira, 'dirb')
1392 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001393 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001394 with self.assertRaises(OSError):
1395 os.removedirs(dirb)
1396 self.assertTrue(os.path.exists(dirb))
1397 self.assertTrue(os.path.exists(dira))
1398 self.assertTrue(os.path.exists(support.TESTFN))
1399
1400
Guido van Rossume7ba4952007-06-06 23:52:48 +00001401class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001402 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001403 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001404 f.write(b'hello')
1405 f.close()
1406 with open(os.devnull, 'rb') as f:
1407 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001408
Andrew Svetlov405faed2012-12-25 12:18:09 +02001409
Guido van Rossume7ba4952007-06-06 23:52:48 +00001410class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001411 def test_urandom_length(self):
1412 self.assertEqual(len(os.urandom(0)), 0)
1413 self.assertEqual(len(os.urandom(1)), 1)
1414 self.assertEqual(len(os.urandom(10)), 10)
1415 self.assertEqual(len(os.urandom(100)), 100)
1416 self.assertEqual(len(os.urandom(1000)), 1000)
1417
1418 def test_urandom_value(self):
1419 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001420 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001421 data2 = os.urandom(16)
1422 self.assertNotEqual(data1, data2)
1423
1424 def get_urandom_subprocess(self, count):
1425 code = '\n'.join((
1426 'import os, sys',
1427 'data = os.urandom(%s)' % count,
1428 'sys.stdout.buffer.write(data)',
1429 'sys.stdout.buffer.flush()'))
1430 out = assert_python_ok('-c', code)
1431 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001432 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001433 return stdout
1434
1435 def test_urandom_subprocess(self):
1436 data1 = self.get_urandom_subprocess(16)
1437 data2 = self.get_urandom_subprocess(16)
1438 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001439
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001440
Victor Stinner9b1f4742016-09-06 16:18:52 -07001441@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1442class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001443 @classmethod
1444 def setUpClass(cls):
1445 try:
1446 os.getrandom(1)
1447 except OSError as exc:
1448 if exc.errno == errno.ENOSYS:
1449 # Python compiled on a more recent Linux version
1450 # than the current Linux kernel
1451 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1452 else:
1453 raise
1454
Victor Stinner9b1f4742016-09-06 16:18:52 -07001455 def test_getrandom_type(self):
1456 data = os.getrandom(16)
1457 self.assertIsInstance(data, bytes)
1458 self.assertEqual(len(data), 16)
1459
1460 def test_getrandom0(self):
1461 empty = os.getrandom(0)
1462 self.assertEqual(empty, b'')
1463
1464 def test_getrandom_random(self):
1465 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1466
1467 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1468 # resource /dev/random
1469
1470 def test_getrandom_nonblock(self):
1471 # The call must not fail. Check also that the flag exists
1472 try:
1473 os.getrandom(1, os.GRND_NONBLOCK)
1474 except BlockingIOError:
1475 # System urandom is not initialized yet
1476 pass
1477
1478 def test_getrandom_value(self):
1479 data1 = os.getrandom(16)
1480 data2 = os.getrandom(16)
1481 self.assertNotEqual(data1, data2)
1482
1483
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001484# os.urandom() doesn't use a file descriptor when it is implemented with the
1485# getentropy() function, the getrandom() function or the getrandom() syscall
1486OS_URANDOM_DONT_USE_FD = (
1487 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1488 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1489 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001490
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001491@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1492 "os.random() does not use a file descriptor")
pxinwrf2d7ac72019-05-21 18:46:37 +08001493@unittest.skipIf(sys.platform == "vxworks",
1494 "VxWorks can't set RLIMIT_NOFILE to 1")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001495class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001496 @unittest.skipUnless(resource, "test requires the resource module")
1497 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001498 # Check urandom() failing when it is not able to open /dev/random.
1499 # We spawn a new process to make the test more robust (if getrlimit()
1500 # failed to restore the file descriptor limit after this, the whole
1501 # test suite would crash; this actually happened on the OS X Tiger
1502 # buildbot).
1503 code = """if 1:
1504 import errno
1505 import os
1506 import resource
1507
1508 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1509 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1510 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001511 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001512 except OSError as e:
1513 assert e.errno == errno.EMFILE, e.errno
1514 else:
1515 raise AssertionError("OSError not raised")
1516 """
1517 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001518
Antoine Pitroue472aea2014-04-26 14:33:03 +02001519 def test_urandom_fd_closed(self):
1520 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1521 # closed.
1522 code = """if 1:
1523 import os
1524 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001525 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001526 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001527 with test.support.SuppressCrashReport():
1528 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001529 sys.stdout.buffer.write(os.urandom(4))
1530 """
1531 rc, out, err = assert_python_ok('-Sc', code)
1532
1533 def test_urandom_fd_reopened(self):
1534 # Issue #21207: urandom() should detect its fd to /dev/urandom
1535 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001536 self.addCleanup(support.unlink, support.TESTFN)
1537 create_file(support.TESTFN, b"x" * 256)
1538
Antoine Pitroue472aea2014-04-26 14:33:03 +02001539 code = """if 1:
1540 import os
1541 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001542 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001543 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001544 with test.support.SuppressCrashReport():
1545 for fd in range(3, 256):
1546 try:
1547 os.close(fd)
1548 except OSError:
1549 pass
1550 else:
1551 # Found the urandom fd (XXX hopefully)
1552 break
1553 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001554 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001555 new_fd = f.fileno()
1556 # Issue #26935: posix allows new_fd and fd to be equal but
1557 # some libc implementations have dup2 return an error in this
1558 # case.
1559 if new_fd != fd:
1560 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001561 sys.stdout.buffer.write(os.urandom(4))
1562 sys.stdout.buffer.write(os.urandom(4))
1563 """.format(TESTFN=support.TESTFN)
1564 rc, out, err = assert_python_ok('-Sc', code)
1565 self.assertEqual(len(out), 8)
1566 self.assertNotEqual(out[0:4], out[4:8])
1567 rc, out2, err2 = assert_python_ok('-Sc', code)
1568 self.assertEqual(len(out2), 8)
1569 self.assertNotEqual(out2, out)
1570
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001571
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001572@contextlib.contextmanager
1573def _execvpe_mockup(defpath=None):
1574 """
1575 Stubs out execv and execve functions when used as context manager.
1576 Records exec calls. The mock execv and execve functions always raise an
1577 exception as they would normally never return.
1578 """
1579 # A list of tuples containing (function name, first arg, args)
1580 # of calls to execv or execve that have been made.
1581 calls = []
1582
1583 def mock_execv(name, *args):
1584 calls.append(('execv', name, args))
1585 raise RuntimeError("execv called")
1586
1587 def mock_execve(name, *args):
1588 calls.append(('execve', name, args))
1589 raise OSError(errno.ENOTDIR, "execve called")
1590
1591 try:
1592 orig_execv = os.execv
1593 orig_execve = os.execve
1594 orig_defpath = os.defpath
1595 os.execv = mock_execv
1596 os.execve = mock_execve
1597 if defpath is not None:
1598 os.defpath = defpath
1599 yield calls
1600 finally:
1601 os.execv = orig_execv
1602 os.execve = orig_execve
1603 os.defpath = orig_defpath
1604
pxinwrf2d7ac72019-05-21 18:46:37 +08001605@unittest.skipUnless(hasattr(os, 'execv'),
1606 "need os.execv()")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001607class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001608 @unittest.skipIf(USING_LINUXTHREADS,
1609 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001610 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001611 self.assertRaises(OSError, os.execvpe, 'no such app-',
1612 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001613
Steve Dowerbce26262016-11-19 19:17:26 -08001614 def test_execv_with_bad_arglist(self):
1615 self.assertRaises(ValueError, os.execv, 'notepad', ())
1616 self.assertRaises(ValueError, os.execv, 'notepad', [])
1617 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1618 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1619
Thomas Heller6790d602007-08-30 17:15:14 +00001620 def test_execvpe_with_bad_arglist(self):
1621 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001622 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1623 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001624
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001625 @unittest.skipUnless(hasattr(os, '_execvpe'),
1626 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001627 def _test_internal_execvpe(self, test_type):
1628 program_path = os.sep + 'absolutepath'
1629 if test_type is bytes:
1630 program = b'executable'
1631 fullpath = os.path.join(os.fsencode(program_path), program)
1632 native_fullpath = fullpath
1633 arguments = [b'progname', 'arg1', 'arg2']
1634 else:
1635 program = 'executable'
1636 arguments = ['progname', 'arg1', 'arg2']
1637 fullpath = os.path.join(program_path, program)
1638 if os.name != "nt":
1639 native_fullpath = os.fsencode(fullpath)
1640 else:
1641 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001642 env = {'spam': 'beans'}
1643
Victor Stinnerb745a742010-05-18 17:17:23 +00001644 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001645 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001646 self.assertRaises(RuntimeError,
1647 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001648 self.assertEqual(len(calls), 1)
1649 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1650
Victor Stinnerb745a742010-05-18 17:17:23 +00001651 # test os._execvpe() with a relative path:
1652 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001653 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001654 self.assertRaises(OSError,
1655 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001656 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001657 self.assertSequenceEqual(calls[0],
1658 ('execve', native_fullpath, (arguments, env)))
1659
1660 # test os._execvpe() with a relative path:
1661 # os.get_exec_path() reads the 'PATH' variable
1662 with _execvpe_mockup() as calls:
1663 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001664 if test_type is bytes:
1665 env_path[b'PATH'] = program_path
1666 else:
1667 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001668 self.assertRaises(OSError,
1669 os._execvpe, program, arguments, env=env_path)
1670 self.assertEqual(len(calls), 1)
1671 self.assertSequenceEqual(calls[0],
1672 ('execve', native_fullpath, (arguments, env_path)))
1673
1674 def test_internal_execvpe_str(self):
1675 self._test_internal_execvpe(str)
1676 if os.name != "nt":
1677 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001678
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001679 def test_execve_invalid_env(self):
1680 args = [sys.executable, '-c', 'pass']
1681
Ville Skyttä49b27342017-08-03 09:00:59 +03001682 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001683 newenv = os.environ.copy()
1684 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1685 with self.assertRaises(ValueError):
1686 os.execve(args[0], args, newenv)
1687
Ville Skyttä49b27342017-08-03 09:00:59 +03001688 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001689 newenv = os.environ.copy()
1690 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1691 with self.assertRaises(ValueError):
1692 os.execve(args[0], args, newenv)
1693
Ville Skyttä49b27342017-08-03 09:00:59 +03001694 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001695 newenv = os.environ.copy()
1696 newenv["FRUIT=ORANGE"] = "lemon"
1697 with self.assertRaises(ValueError):
1698 os.execve(args[0], args, newenv)
1699
Alexey Izbyshev83460312018-10-20 03:28:22 +03001700 @unittest.skipUnless(sys.platform == "win32", "Win32-specific test")
1701 def test_execve_with_empty_path(self):
1702 # bpo-32890: Check GetLastError() misuse
1703 try:
1704 os.execve('', ['arg'], {})
1705 except OSError as e:
1706 self.assertTrue(e.winerror is None or e.winerror != 0)
1707 else:
1708 self.fail('No OSError raised')
1709
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001710
Serhiy Storchaka43767632013-11-03 21:31:38 +02001711@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001712class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001713 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001714 try:
1715 os.stat(support.TESTFN)
1716 except FileNotFoundError:
1717 exists = False
1718 except OSError as exc:
1719 exists = True
1720 self.fail("file %s must not exist; os.stat failed with %s"
1721 % (support.TESTFN, exc))
1722 else:
1723 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001724
Thomas Wouters477c8d52006-05-27 19:21:47 +00001725 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001726 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001727
1728 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001729 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001730
1731 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001732 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001733
1734 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001735 self.addCleanup(support.unlink, support.TESTFN)
1736
Victor Stinnere77c9742016-03-25 10:28:23 +01001737 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001738 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001739
1740 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001741 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001742
Thomas Wouters477c8d52006-05-27 19:21:47 +00001743 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001744 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001745
Victor Stinnere77c9742016-03-25 10:28:23 +01001746
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001747class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001748 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001749 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1750 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001751 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001752 def get_single(f):
1753 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001754 if hasattr(os, f):
1755 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001756 return helper
1757 for f in singles:
1758 locals()["test_"+f] = get_single(f)
1759
Benjamin Peterson7522c742009-01-19 21:00:09 +00001760 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001761 try:
1762 f(support.make_bad_fd(), *args)
1763 except OSError as e:
1764 self.assertEqual(e.errno, errno.EBADF)
1765 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001766 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001767 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001768
Serhiy Storchaka43767632013-11-03 21:31:38 +02001769 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001770 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001771 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001772
Serhiy Storchaka43767632013-11-03 21:31:38 +02001773 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001774 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001775 fd = support.make_bad_fd()
1776 # Make sure none of the descriptors we are about to close are
1777 # currently valid (issue 6542).
1778 for i in range(10):
1779 try: os.fstat(fd+i)
1780 except OSError:
1781 pass
1782 else:
1783 break
1784 if i < 2:
1785 raise unittest.SkipTest(
1786 "Unable to acquire a range of invalid file descriptors")
1787 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001788
Serhiy Storchaka43767632013-11-03 21:31:38 +02001789 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001790 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001791 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001792
Serhiy Storchaka43767632013-11-03 21:31:38 +02001793 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001794 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001795 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001796
Serhiy Storchaka43767632013-11-03 21:31:38 +02001797 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001798 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001799 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001800
Serhiy Storchaka43767632013-11-03 21:31:38 +02001801 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001802 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001803 self.check(os.pathconf, "PC_NAME_MAX")
1804 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001805
Serhiy Storchaka43767632013-11-03 21:31:38 +02001806 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001807 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001808 self.check(os.truncate, 0)
1809 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001810
Serhiy Storchaka43767632013-11-03 21:31:38 +02001811 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001812 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001813 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001814
Serhiy Storchaka43767632013-11-03 21:31:38 +02001815 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001816 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001817 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001818
Victor Stinner57ddf782014-01-08 15:21:28 +01001819 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1820 def test_readv(self):
1821 buf = bytearray(10)
1822 self.check(os.readv, [buf])
1823
Serhiy Storchaka43767632013-11-03 21:31:38 +02001824 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001825 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001826 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001827
Serhiy Storchaka43767632013-11-03 21:31:38 +02001828 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001829 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001830 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001831
Victor Stinner57ddf782014-01-08 15:21:28 +01001832 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1833 def test_writev(self):
1834 self.check(os.writev, [b'abc'])
1835
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001836 def test_inheritable(self):
1837 self.check(os.get_inheritable)
1838 self.check(os.set_inheritable, True)
1839
1840 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1841 'needs os.get_blocking() and os.set_blocking()')
1842 def test_blocking(self):
1843 self.check(os.get_blocking)
1844 self.check(os.set_blocking, True)
1845
Brian Curtin1b9df392010-11-24 20:24:31 +00001846
1847class LinkTests(unittest.TestCase):
1848 def setUp(self):
1849 self.file1 = support.TESTFN
1850 self.file2 = os.path.join(support.TESTFN + "2")
1851
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001852 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001853 for file in (self.file1, self.file2):
1854 if os.path.exists(file):
1855 os.unlink(file)
1856
Brian Curtin1b9df392010-11-24 20:24:31 +00001857 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001858 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001859
xdegaye6a55d092017-11-12 17:57:04 +01001860 try:
1861 os.link(file1, file2)
1862 except PermissionError as e:
1863 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001864 with open(file1, "r") as f1, open(file2, "r") as f2:
1865 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1866
1867 def test_link(self):
1868 self._test_link(self.file1, self.file2)
1869
1870 def test_link_bytes(self):
1871 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1872 bytes(self.file2, sys.getfilesystemencoding()))
1873
Brian Curtinf498b752010-11-30 15:54:04 +00001874 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001875 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001876 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001877 except UnicodeError:
1878 raise unittest.SkipTest("Unable to encode for this platform.")
1879
Brian Curtinf498b752010-11-30 15:54:04 +00001880 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001881 self.file2 = self.file1 + "2"
1882 self._test_link(self.file1, self.file2)
1883
Serhiy Storchaka43767632013-11-03 21:31:38 +02001884@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1885class PosixUidGidTests(unittest.TestCase):
Victor Stinner876e82b2019-03-11 13:57:53 +01001886 # uid_t and gid_t are 32-bit unsigned integers on Linux
1887 UID_OVERFLOW = (1 << 32)
1888 GID_OVERFLOW = (1 << 32)
1889
Serhiy Storchaka43767632013-11-03 21:31:38 +02001890 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1891 def test_setuid(self):
1892 if os.getuid() != 0:
1893 self.assertRaises(OSError, os.setuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001894 self.assertRaises(TypeError, os.setuid, 'not an int')
1895 self.assertRaises(OverflowError, os.setuid, self.UID_OVERFLOW)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001896
Serhiy Storchaka43767632013-11-03 21:31:38 +02001897 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1898 def test_setgid(self):
1899 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1900 self.assertRaises(OSError, os.setgid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001901 self.assertRaises(TypeError, os.setgid, 'not an int')
1902 self.assertRaises(OverflowError, os.setgid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001903
Serhiy Storchaka43767632013-11-03 21:31:38 +02001904 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1905 def test_seteuid(self):
1906 if os.getuid() != 0:
1907 self.assertRaises(OSError, os.seteuid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001908 self.assertRaises(TypeError, os.setegid, 'not an int')
1909 self.assertRaises(OverflowError, os.seteuid, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001910
Serhiy Storchaka43767632013-11-03 21:31:38 +02001911 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1912 def test_setegid(self):
1913 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1914 self.assertRaises(OSError, os.setegid, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001915 self.assertRaises(TypeError, os.setegid, 'not an int')
1916 self.assertRaises(OverflowError, os.setegid, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001917
Serhiy Storchaka43767632013-11-03 21:31:38 +02001918 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1919 def test_setreuid(self):
1920 if os.getuid() != 0:
1921 self.assertRaises(OSError, os.setreuid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001922 self.assertRaises(TypeError, os.setreuid, 'not an int', 0)
1923 self.assertRaises(TypeError, os.setreuid, 0, 'not an int')
1924 self.assertRaises(OverflowError, os.setreuid, self.UID_OVERFLOW, 0)
1925 self.assertRaises(OverflowError, os.setreuid, 0, self.UID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001926
Serhiy Storchaka43767632013-11-03 21:31:38 +02001927 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1928 def test_setreuid_neg1(self):
1929 # Needs to accept -1. We run this in a subprocess to avoid
1930 # altering the test runner's process state (issue8045).
1931 subprocess.check_call([
1932 sys.executable, '-c',
1933 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001934
Serhiy Storchaka43767632013-11-03 21:31:38 +02001935 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1936 def test_setregid(self):
1937 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1938 self.assertRaises(OSError, os.setregid, 0, 0)
Victor Stinner876e82b2019-03-11 13:57:53 +01001939 self.assertRaises(TypeError, os.setregid, 'not an int', 0)
1940 self.assertRaises(TypeError, os.setregid, 0, 'not an int')
1941 self.assertRaises(OverflowError, os.setregid, self.GID_OVERFLOW, 0)
1942 self.assertRaises(OverflowError, os.setregid, 0, self.GID_OVERFLOW)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001943
Serhiy Storchaka43767632013-11-03 21:31:38 +02001944 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1945 def test_setregid_neg1(self):
1946 # Needs to accept -1. We run this in a subprocess to avoid
1947 # altering the test runner's process state (issue8045).
1948 subprocess.check_call([
1949 sys.executable, '-c',
1950 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001951
Serhiy Storchaka43767632013-11-03 21:31:38 +02001952@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1953class Pep383Tests(unittest.TestCase):
1954 def setUp(self):
1955 if support.TESTFN_UNENCODABLE:
1956 self.dir = support.TESTFN_UNENCODABLE
1957 elif support.TESTFN_NONASCII:
1958 self.dir = support.TESTFN_NONASCII
1959 else:
1960 self.dir = support.TESTFN
1961 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001962
Serhiy Storchaka43767632013-11-03 21:31:38 +02001963 bytesfn = []
1964 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001965 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001966 fn = os.fsencode(fn)
1967 except UnicodeEncodeError:
1968 return
1969 bytesfn.append(fn)
1970 add_filename(support.TESTFN_UNICODE)
1971 if support.TESTFN_UNENCODABLE:
1972 add_filename(support.TESTFN_UNENCODABLE)
1973 if support.TESTFN_NONASCII:
1974 add_filename(support.TESTFN_NONASCII)
1975 if not bytesfn:
1976 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001977
Serhiy Storchaka43767632013-11-03 21:31:38 +02001978 self.unicodefn = set()
1979 os.mkdir(self.dir)
1980 try:
1981 for fn in bytesfn:
1982 support.create_empty_file(os.path.join(self.bdir, fn))
1983 fn = os.fsdecode(fn)
1984 if fn in self.unicodefn:
1985 raise ValueError("duplicate filename")
1986 self.unicodefn.add(fn)
1987 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001988 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001989 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001990
Serhiy Storchaka43767632013-11-03 21:31:38 +02001991 def tearDown(self):
1992 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001993
Serhiy Storchaka43767632013-11-03 21:31:38 +02001994 def test_listdir(self):
1995 expected = self.unicodefn
1996 found = set(os.listdir(self.dir))
1997 self.assertEqual(found, expected)
1998 # test listdir without arguments
1999 current_directory = os.getcwd()
2000 try:
2001 os.chdir(os.sep)
2002 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
2003 finally:
2004 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00002005
Serhiy Storchaka43767632013-11-03 21:31:38 +02002006 def test_open(self):
2007 for fn in self.unicodefn:
2008 f = open(os.path.join(self.dir, fn), 'rb')
2009 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01002010
Serhiy Storchaka43767632013-11-03 21:31:38 +02002011 @unittest.skipUnless(hasattr(os, 'statvfs'),
2012 "need os.statvfs()")
2013 def test_statvfs(self):
2014 # issue #9645
2015 for fn in self.unicodefn:
2016 # should not fail with file not found error
2017 fullname = os.path.join(self.dir, fn)
2018 os.statvfs(fullname)
2019
2020 def test_stat(self):
2021 for fn in self.unicodefn:
2022 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002023
Brian Curtineb24d742010-04-12 17:16:38 +00002024@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2025class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00002026 def _kill(self, sig):
2027 # Start sys.executable as a subprocess and communicate from the
2028 # subprocess to the parent that the interpreter is ready. When it
2029 # becomes ready, send *sig* via os.kill to the subprocess and check
2030 # that the return code is equal to *sig*.
2031 import ctypes
2032 from ctypes import wintypes
2033 import msvcrt
2034
2035 # Since we can't access the contents of the process' stdout until the
2036 # process has exited, use PeekNamedPipe to see what's inside stdout
2037 # without waiting. This is done so we can tell that the interpreter
2038 # is started and running at a point where it could handle a signal.
2039 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
2040 PeekNamedPipe.restype = wintypes.BOOL
2041 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
2042 ctypes.POINTER(ctypes.c_char), # stdout buf
2043 wintypes.DWORD, # Buffer size
2044 ctypes.POINTER(wintypes.DWORD), # bytes read
2045 ctypes.POINTER(wintypes.DWORD), # bytes avail
2046 ctypes.POINTER(wintypes.DWORD)) # bytes left
2047 msg = "running"
2048 proc = subprocess.Popen([sys.executable, "-c",
2049 "import sys;"
2050 "sys.stdout.write('{}');"
2051 "sys.stdout.flush();"
2052 "input()".format(msg)],
2053 stdout=subprocess.PIPE,
2054 stderr=subprocess.PIPE,
2055 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00002056 self.addCleanup(proc.stdout.close)
2057 self.addCleanup(proc.stderr.close)
2058 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00002059
2060 count, max = 0, 100
2061 while count < max and proc.poll() is None:
2062 # Create a string buffer to store the result of stdout from the pipe
2063 buf = ctypes.create_string_buffer(len(msg))
2064 # Obtain the text currently in proc.stdout
2065 # Bytes read/avail/left are left as NULL and unused
2066 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
2067 buf, ctypes.sizeof(buf), None, None, None)
2068 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
2069 if buf.value:
2070 self.assertEqual(msg, buf.value.decode())
2071 break
2072 time.sleep(0.1)
2073 count += 1
2074 else:
2075 self.fail("Did not receive communication from the subprocess")
2076
Brian Curtineb24d742010-04-12 17:16:38 +00002077 os.kill(proc.pid, sig)
2078 self.assertEqual(proc.wait(), sig)
2079
2080 def test_kill_sigterm(self):
2081 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00002082 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00002083
2084 def test_kill_int(self):
2085 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00002086 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00002087
2088 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002089 tagname = "test_os_%s" % uuid.uuid1()
2090 m = mmap.mmap(-1, 1, tagname)
2091 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00002092 # Run a script which has console control handling enabled.
2093 proc = subprocess.Popen([sys.executable,
2094 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002095 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00002096 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
2097 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002098 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002099 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00002100 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002101 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002102 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002103 count += 1
2104 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00002105 # Forcefully kill the process if we weren't able to signal it.
2106 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002107 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002108 os.kill(proc.pid, event)
2109 # proc.send_signal(event) could also be done here.
2110 # Allow time for the signal to be passed and the process to exit.
2111 time.sleep(0.5)
2112 if not proc.poll():
2113 # Forcefully kill the process if we weren't able to signal it.
2114 os.kill(proc.pid, signal.SIGINT)
2115 self.fail("subprocess did not stop on {}".format(name))
2116
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002117 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002118 def test_CTRL_C_EVENT(self):
2119 from ctypes import wintypes
2120 import ctypes
2121
2122 # Make a NULL value by creating a pointer with no argument.
2123 NULL = ctypes.POINTER(ctypes.c_int)()
2124 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2125 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2126 wintypes.BOOL)
2127 SetConsoleCtrlHandler.restype = wintypes.BOOL
2128
2129 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002130 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002131 # by subprocesses.
2132 SetConsoleCtrlHandler(NULL, 0)
2133
2134 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2135
2136 def test_CTRL_BREAK_EVENT(self):
2137 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2138
2139
Brian Curtind40e6f72010-07-08 21:39:08 +00002140@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002141class Win32ListdirTests(unittest.TestCase):
2142 """Test listdir on Windows."""
2143
2144 def setUp(self):
2145 self.created_paths = []
2146 for i in range(2):
2147 dir_name = 'SUB%d' % i
2148 dir_path = os.path.join(support.TESTFN, dir_name)
2149 file_name = 'FILE%d' % i
2150 file_path = os.path.join(support.TESTFN, file_name)
2151 os.makedirs(dir_path)
2152 with open(file_path, 'w') as f:
2153 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2154 self.created_paths.extend([dir_name, file_name])
2155 self.created_paths.sort()
2156
2157 def tearDown(self):
2158 shutil.rmtree(support.TESTFN)
2159
2160 def test_listdir_no_extended_path(self):
2161 """Test when the path is not an "extended" path."""
2162 # unicode
2163 self.assertEqual(
2164 sorted(os.listdir(support.TESTFN)),
2165 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002166
Tim Golden781bbeb2013-10-25 20:24:06 +01002167 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002168 self.assertEqual(
2169 sorted(os.listdir(os.fsencode(support.TESTFN))),
2170 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002171
2172 def test_listdir_extended_path(self):
2173 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002174 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002175 # unicode
2176 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2177 self.assertEqual(
2178 sorted(os.listdir(path)),
2179 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002180
Tim Golden781bbeb2013-10-25 20:24:06 +01002181 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002182 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2183 self.assertEqual(
2184 sorted(os.listdir(path)),
2185 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002186
2187
Berker Peksage0b5b202018-08-15 13:03:41 +03002188@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
2189class ReadlinkTests(unittest.TestCase):
2190 filelink = 'readlinktest'
2191 filelink_target = os.path.abspath(__file__)
2192 filelinkb = os.fsencode(filelink)
2193 filelinkb_target = os.fsencode(filelink_target)
2194
2195 def setUp(self):
2196 self.assertTrue(os.path.exists(self.filelink_target))
2197 self.assertTrue(os.path.exists(self.filelinkb_target))
2198 self.assertFalse(os.path.exists(self.filelink))
2199 self.assertFalse(os.path.exists(self.filelinkb))
2200
2201 def test_not_symlink(self):
2202 filelink_target = FakePath(self.filelink_target)
2203 self.assertRaises(OSError, os.readlink, self.filelink_target)
2204 self.assertRaises(OSError, os.readlink, filelink_target)
2205
2206 def test_missing_link(self):
2207 self.assertRaises(FileNotFoundError, os.readlink, 'missing-link')
2208 self.assertRaises(FileNotFoundError, os.readlink,
2209 FakePath('missing-link'))
2210
2211 @support.skip_unless_symlink
2212 def test_pathlike(self):
2213 os.symlink(self.filelink_target, self.filelink)
2214 self.addCleanup(support.unlink, self.filelink)
2215 filelink = FakePath(self.filelink)
2216 self.assertEqual(os.readlink(filelink), self.filelink_target)
2217
2218 @support.skip_unless_symlink
2219 def test_pathlike_bytes(self):
2220 os.symlink(self.filelinkb_target, self.filelinkb)
2221 self.addCleanup(support.unlink, self.filelinkb)
2222 path = os.readlink(FakePath(self.filelinkb))
2223 self.assertEqual(path, self.filelinkb_target)
2224 self.assertIsInstance(path, bytes)
2225
2226 @support.skip_unless_symlink
2227 def test_bytes(self):
2228 os.symlink(self.filelinkb_target, self.filelinkb)
2229 self.addCleanup(support.unlink, self.filelinkb)
2230 path = os.readlink(self.filelinkb)
2231 self.assertEqual(path, self.filelinkb_target)
2232 self.assertIsInstance(path, bytes)
2233
2234
Tim Golden781bbeb2013-10-25 20:24:06 +01002235@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002236@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002237class Win32SymlinkTests(unittest.TestCase):
2238 filelink = 'filelinktest'
2239 filelink_target = os.path.abspath(__file__)
2240 dirlink = 'dirlinktest'
2241 dirlink_target = os.path.dirname(filelink_target)
2242 missing_link = 'missing link'
2243
2244 def setUp(self):
2245 assert os.path.exists(self.dirlink_target)
2246 assert os.path.exists(self.filelink_target)
2247 assert not os.path.exists(self.dirlink)
2248 assert not os.path.exists(self.filelink)
2249 assert not os.path.exists(self.missing_link)
2250
2251 def tearDown(self):
2252 if os.path.exists(self.filelink):
2253 os.remove(self.filelink)
2254 if os.path.exists(self.dirlink):
2255 os.rmdir(self.dirlink)
2256 if os.path.lexists(self.missing_link):
2257 os.remove(self.missing_link)
2258
2259 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002260 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002261 self.assertTrue(os.path.exists(self.dirlink))
2262 self.assertTrue(os.path.isdir(self.dirlink))
2263 self.assertTrue(os.path.islink(self.dirlink))
2264 self.check_stat(self.dirlink, self.dirlink_target)
2265
2266 def test_file_link(self):
2267 os.symlink(self.filelink_target, self.filelink)
2268 self.assertTrue(os.path.exists(self.filelink))
2269 self.assertTrue(os.path.isfile(self.filelink))
2270 self.assertTrue(os.path.islink(self.filelink))
2271 self.check_stat(self.filelink, self.filelink_target)
2272
2273 def _create_missing_dir_link(self):
2274 'Create a "directory" link to a non-existent target'
2275 linkname = self.missing_link
2276 if os.path.lexists(linkname):
2277 os.remove(linkname)
2278 target = r'c:\\target does not exist.29r3c740'
2279 assert not os.path.exists(target)
2280 target_is_dir = True
2281 os.symlink(target, linkname, target_is_dir)
2282
2283 def test_remove_directory_link_to_missing_target(self):
2284 self._create_missing_dir_link()
2285 # For compatibility with Unix, os.remove will check the
2286 # directory status and call RemoveDirectory if the symlink
2287 # was created with target_is_dir==True.
2288 os.remove(self.missing_link)
2289
2290 @unittest.skip("currently fails; consider for improvement")
2291 def test_isdir_on_directory_link_to_missing_target(self):
2292 self._create_missing_dir_link()
2293 # consider having isdir return true for directory links
2294 self.assertTrue(os.path.isdir(self.missing_link))
2295
2296 @unittest.skip("currently fails; consider for improvement")
2297 def test_rmdir_on_directory_link_to_missing_target(self):
2298 self._create_missing_dir_link()
2299 # consider allowing rmdir to remove directory links
2300 os.rmdir(self.missing_link)
2301
2302 def check_stat(self, link, target):
2303 self.assertEqual(os.stat(link), os.stat(target))
2304 self.assertNotEqual(os.lstat(link), os.stat(link))
2305
Brian Curtind25aef52011-06-13 15:16:04 -05002306 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002307 self.assertEqual(os.stat(bytes_link), os.stat(target))
2308 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002309
2310 def test_12084(self):
2311 level1 = os.path.abspath(support.TESTFN)
2312 level2 = os.path.join(level1, "level2")
2313 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002314 self.addCleanup(support.rmtree, level1)
2315
2316 os.mkdir(level1)
2317 os.mkdir(level2)
2318 os.mkdir(level3)
2319
2320 file1 = os.path.abspath(os.path.join(level1, "file1"))
2321 create_file(file1)
2322
2323 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002324 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002325 os.chdir(level2)
2326 link = os.path.join(level2, "link")
2327 os.symlink(os.path.relpath(file1), "link")
2328 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002329
Victor Stinnerae39d232016-03-24 17:12:55 +01002330 # Check os.stat calls from the same dir as the link
2331 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002332
Victor Stinnerae39d232016-03-24 17:12:55 +01002333 # Check os.stat calls from a dir below the link
2334 os.chdir(level1)
2335 self.assertEqual(os.stat(file1),
2336 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002337
Victor Stinnerae39d232016-03-24 17:12:55 +01002338 # Check os.stat calls from a dir above the link
2339 os.chdir(level3)
2340 self.assertEqual(os.stat(file1),
2341 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002342 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002343 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002344
SSE43c34aad2018-02-13 00:10:35 +07002345 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2346 and os.path.exists(r'C:\ProgramData'),
2347 'Test directories not found')
2348 def test_29248(self):
2349 # os.symlink() calls CreateSymbolicLink, which creates
2350 # the reparse data buffer with the print name stored
2351 # first, so the offset is always 0. CreateSymbolicLink
2352 # stores the "PrintName" DOS path (e.g. "C:\") first,
2353 # with an offset of 0, followed by the "SubstituteName"
2354 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2355 # the other hand, seems to have been created manually
2356 # with an inverted order.
2357 target = os.readlink(r'C:\Users\All Users')
2358 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2359
Steve Dower6921e732018-03-05 14:26:08 -08002360 def test_buffer_overflow(self):
2361 # Older versions would have a buffer overflow when detecting
2362 # whether a link source was a directory. This test ensures we
2363 # no longer crash, but does not otherwise validate the behavior
2364 segment = 'X' * 27
2365 path = os.path.join(*[segment] * 10)
2366 test_cases = [
2367 # overflow with absolute src
2368 ('\\' + path, segment),
2369 # overflow dest with relative src
2370 (segment, path),
2371 # overflow when joining src
2372 (path[:180], path[:180]),
2373 ]
2374 for src, dest in test_cases:
2375 try:
2376 os.symlink(src, dest)
2377 except FileNotFoundError:
2378 pass
2379 else:
2380 try:
2381 os.remove(dest)
2382 except OSError:
2383 pass
2384 # Also test with bytes, since that is a separate code path.
2385 try:
2386 os.symlink(os.fsencode(src), os.fsencode(dest))
2387 except FileNotFoundError:
2388 pass
2389 else:
2390 try:
2391 os.remove(dest)
2392 except OSError:
2393 pass
Brian Curtind40e6f72010-07-08 21:39:08 +00002394
Tim Golden0321cf22014-05-05 19:46:17 +01002395@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2396class Win32JunctionTests(unittest.TestCase):
2397 junction = 'junctiontest'
2398 junction_target = os.path.dirname(os.path.abspath(__file__))
2399
2400 def setUp(self):
2401 assert os.path.exists(self.junction_target)
2402 assert not os.path.exists(self.junction)
2403
2404 def tearDown(self):
2405 if os.path.exists(self.junction):
2406 # os.rmdir delegates to Windows' RemoveDirectoryW,
2407 # which removes junction points safely.
2408 os.rmdir(self.junction)
2409
2410 def test_create_junction(self):
2411 _winapi.CreateJunction(self.junction_target, self.junction)
2412 self.assertTrue(os.path.exists(self.junction))
2413 self.assertTrue(os.path.isdir(self.junction))
2414
2415 # Junctions are not recognized as links.
2416 self.assertFalse(os.path.islink(self.junction))
2417
2418 def test_unlink_removes_junction(self):
2419 _winapi.CreateJunction(self.junction_target, self.junction)
2420 self.assertTrue(os.path.exists(self.junction))
2421
2422 os.unlink(self.junction)
2423 self.assertFalse(os.path.exists(self.junction))
2424
Mark Becwarb82bfac2019-02-02 16:08:23 -05002425@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2426class Win32NtTests(unittest.TestCase):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002427 def test_getfinalpathname_handles(self):
Berker Peksag6ef726a2019-04-22 18:46:28 +03002428 nt = support.import_module('nt')
2429 ctypes = support.import_module('ctypes')
2430 import ctypes.wintypes
Mark Becwarb82bfac2019-02-02 16:08:23 -05002431
2432 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
2433 kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
2434
2435 kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
2436 kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
2437 ctypes.wintypes.LPDWORD)
2438
2439 # This is a pseudo-handle that doesn't need to be closed
2440 hproc = kernel.GetCurrentProcess()
2441
2442 handle_count = ctypes.wintypes.DWORD()
2443 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2444 self.assertEqual(1, ok)
2445
2446 before_count = handle_count.value
2447
2448 # The first two test the error path, __file__ tests the success path
Berker Peksag6ef726a2019-04-22 18:46:28 +03002449 filenames = [
2450 r'\\?\C:',
2451 r'\\?\NUL',
2452 r'\\?\CONIN',
2453 __file__,
2454 ]
Mark Becwarb82bfac2019-02-02 16:08:23 -05002455
Berker Peksag6ef726a2019-04-22 18:46:28 +03002456 for _ in range(10):
Mark Becwarb82bfac2019-02-02 16:08:23 -05002457 for name in filenames:
2458 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002459 nt._getfinalpathname(name)
2460 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002461 # Failure is expected
2462 pass
2463 try:
Berker Peksag6ef726a2019-04-22 18:46:28 +03002464 os.stat(name)
2465 except Exception:
Mark Becwarb82bfac2019-02-02 16:08:23 -05002466 pass
2467
2468 ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
2469 self.assertEqual(1, ok)
2470
2471 handle_delta = handle_count.value - before_count
2472
2473 self.assertEqual(0, handle_delta)
Tim Golden0321cf22014-05-05 19:46:17 +01002474
Jason R. Coombs3a092862013-05-27 23:21:28 -04002475@support.skip_unless_symlink
2476class NonLocalSymlinkTests(unittest.TestCase):
2477
2478 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002479 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002480 Create this structure:
2481
2482 base
2483 \___ some_dir
2484 """
2485 os.makedirs('base/some_dir')
2486
2487 def tearDown(self):
2488 shutil.rmtree('base')
2489
2490 def test_directory_link_nonlocal(self):
2491 """
2492 The symlink target should resolve relative to the link, not relative
2493 to the current directory.
2494
2495 Then, link base/some_link -> base/some_dir and ensure that some_link
2496 is resolved as a directory.
2497
2498 In issue13772, it was discovered that directory detection failed if
2499 the symlink target was not specified relative to the current
2500 directory, which was a defect in the implementation.
2501 """
2502 src = os.path.join('base', 'some_link')
2503 os.symlink('some_dir', src)
2504 assert os.path.isdir(src)
2505
2506
Victor Stinnere8d51452010-08-19 01:05:19 +00002507class FSEncodingTests(unittest.TestCase):
2508 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002509 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2510 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002511
Victor Stinnere8d51452010-08-19 01:05:19 +00002512 def test_identity(self):
2513 # assert fsdecode(fsencode(x)) == x
2514 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2515 try:
2516 bytesfn = os.fsencode(fn)
2517 except UnicodeEncodeError:
2518 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002519 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002520
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002521
Brett Cannonefb00c02012-02-29 18:31:31 -05002522
2523class DeviceEncodingTests(unittest.TestCase):
2524
2525 def test_bad_fd(self):
2526 # Return None when an fd doesn't actually exist.
2527 self.assertIsNone(os.device_encoding(123456))
2528
Paul Monson62dfd7d2019-04-25 11:36:45 -07002529 @unittest.skipUnless(os.isatty(0) and not win32_is_iot() and (sys.platform.startswith('win') or
Philip Jenveye308b7c2012-02-29 16:16:15 -08002530 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002531 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002532 def test_device_encoding(self):
2533 encoding = os.device_encoding(0)
2534 self.assertIsNotNone(encoding)
2535 self.assertTrue(codecs.lookup(encoding))
2536
2537
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002538class PidTests(unittest.TestCase):
2539 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2540 def test_getppid(self):
2541 p = subprocess.Popen([sys.executable, '-c',
2542 'import os; print(os.getppid())'],
2543 stdout=subprocess.PIPE)
2544 stdout, _ = p.communicate()
2545 # We are the parent of our subprocess
2546 self.assertEqual(int(stdout), os.getpid())
2547
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002548 def test_waitpid(self):
2549 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002550 # Add an implicit test for PyUnicode_FSConverter().
Serhiy Storchakab21d1552018-03-02 11:53:51 +02002551 pid = os.spawnv(os.P_NOWAIT, FakePath(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002552 status = os.waitpid(pid, 0)
2553 self.assertEqual(status, (pid, 0))
2554
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002555
Victor Stinner4659ccf2016-09-14 10:57:00 +02002556class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002557 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002558 self.exitcode = 17
2559
2560 filename = support.TESTFN
2561 self.addCleanup(support.unlink, filename)
2562
2563 if not with_env:
2564 code = 'import sys; sys.exit(%s)' % self.exitcode
2565 else:
2566 self.env = dict(os.environ)
2567 # create an unique key
2568 self.key = str(uuid.uuid4())
2569 self.env[self.key] = self.key
2570 # read the variable from os.environ to check that it exists
2571 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2572 % (self.key, self.exitcode))
2573
2574 with open(filename, "w") as fp:
2575 fp.write(code)
2576
Berker Peksag81816462016-09-15 20:19:47 +03002577 args = [sys.executable, filename]
2578 if use_bytes:
2579 args = [os.fsencode(a) for a in args]
2580 self.env = {os.fsencode(k): os.fsencode(v)
2581 for k, v in self.env.items()}
2582
2583 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002584
Berker Peksag4af23d72016-09-15 20:32:44 +03002585 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002586 def test_spawnl(self):
2587 args = self.create_args()
2588 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2589 self.assertEqual(exitcode, self.exitcode)
2590
Berker Peksag4af23d72016-09-15 20:32:44 +03002591 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002592 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002593 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002594 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2595 self.assertEqual(exitcode, self.exitcode)
2596
Berker Peksag4af23d72016-09-15 20:32:44 +03002597 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002598 def test_spawnlp(self):
2599 args = self.create_args()
2600 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2601 self.assertEqual(exitcode, self.exitcode)
2602
Berker Peksag4af23d72016-09-15 20:32:44 +03002603 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002604 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002605 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002606 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2607 self.assertEqual(exitcode, self.exitcode)
2608
Berker Peksag4af23d72016-09-15 20:32:44 +03002609 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002610 def test_spawnv(self):
2611 args = self.create_args()
2612 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2613 self.assertEqual(exitcode, self.exitcode)
2614
Berker Peksag4af23d72016-09-15 20:32:44 +03002615 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002616 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002617 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002618 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2619 self.assertEqual(exitcode, self.exitcode)
2620
Berker Peksag4af23d72016-09-15 20:32:44 +03002621 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002622 def test_spawnvp(self):
2623 args = self.create_args()
2624 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2625 self.assertEqual(exitcode, self.exitcode)
2626
Berker Peksag4af23d72016-09-15 20:32:44 +03002627 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002628 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002629 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002630 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2631 self.assertEqual(exitcode, self.exitcode)
2632
Berker Peksag4af23d72016-09-15 20:32:44 +03002633 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002634 def test_nowait(self):
2635 args = self.create_args()
2636 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2637 result = os.waitpid(pid, 0)
2638 self.assertEqual(result[0], pid)
2639 status = result[1]
2640 if hasattr(os, 'WIFEXITED'):
2641 self.assertTrue(os.WIFEXITED(status))
2642 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2643 else:
2644 self.assertEqual(status, self.exitcode << 8)
2645
Berker Peksag4af23d72016-09-15 20:32:44 +03002646 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002647 def test_spawnve_bytes(self):
2648 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2649 args = self.create_args(with_env=True, use_bytes=True)
2650 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2651 self.assertEqual(exitcode, self.exitcode)
2652
Steve Dower859fd7b2016-11-19 18:53:19 -08002653 @requires_os_func('spawnl')
2654 def test_spawnl_noargs(self):
2655 args = self.create_args()
2656 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002657 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002658
2659 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002660 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002661 args = self.create_args()
2662 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002663 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002664
2665 @requires_os_func('spawnv')
2666 def test_spawnv_noargs(self):
2667 args = self.create_args()
2668 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2669 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002670 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2671 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002672
2673 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002674 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002675 args = self.create_args()
2676 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2677 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002678 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2679 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002680
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002681 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002682 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002683
Ville Skyttä49b27342017-08-03 09:00:59 +03002684 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002685 newenv = os.environ.copy()
2686 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2687 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002688 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002689 except ValueError:
2690 pass
2691 else:
2692 self.assertEqual(exitcode, 127)
2693
Ville Skyttä49b27342017-08-03 09:00:59 +03002694 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002695 newenv = os.environ.copy()
2696 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2697 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002698 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002699 except ValueError:
2700 pass
2701 else:
2702 self.assertEqual(exitcode, 127)
2703
Ville Skyttä49b27342017-08-03 09:00:59 +03002704 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002705 newenv = os.environ.copy()
2706 newenv["FRUIT=ORANGE"] = "lemon"
2707 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002708 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002709 except ValueError:
2710 pass
2711 else:
2712 self.assertEqual(exitcode, 127)
2713
Ville Skyttä49b27342017-08-03 09:00:59 +03002714 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002715 filename = support.TESTFN
2716 self.addCleanup(support.unlink, filename)
2717 with open(filename, "w") as fp:
2718 fp.write('import sys, os\n'
2719 'if os.getenv("FRUIT") != "orange=lemon":\n'
2720 ' raise AssertionError')
2721 args = [sys.executable, filename]
2722 newenv = os.environ.copy()
2723 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002724 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002725 self.assertEqual(exitcode, 0)
2726
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002727 @requires_os_func('spawnve')
2728 def test_spawnve_invalid_env(self):
2729 self._test_invalid_env(os.spawnve)
2730
2731 @requires_os_func('spawnvpe')
2732 def test_spawnvpe_invalid_env(self):
2733 self._test_invalid_env(os.spawnvpe)
2734
Serhiy Storchaka77703942017-06-25 07:33:01 +03002735
Brian Curtin0151b8e2010-09-24 13:43:43 +00002736# The introduction of this TestCase caused at least two different errors on
2737# *nix buildbots. Temporarily skip this to let the buildbots move along.
2738@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002739@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2740class LoginTests(unittest.TestCase):
2741 def test_getlogin(self):
2742 user_name = os.getlogin()
2743 self.assertNotEqual(len(user_name), 0)
2744
2745
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002746@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2747 "needs os.getpriority and os.setpriority")
2748class ProgramPriorityTests(unittest.TestCase):
2749 """Tests for os.getpriority() and os.setpriority()."""
2750
2751 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002752
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002753 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2754 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2755 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002756 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2757 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002758 raise unittest.SkipTest("unable to reliably test setpriority "
2759 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002760 else:
2761 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002762 finally:
2763 try:
2764 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2765 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002766 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002767 raise
2768
2769
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002770class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002771
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002772 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002773
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002774 def __init__(self, conn):
2775 asynchat.async_chat.__init__(self, conn)
2776 self.in_buffer = []
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002777 self.accumulate = True
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002778 self.closed = False
2779 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002780
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002781 def handle_read(self):
2782 data = self.recv(4096)
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002783 if self.accumulate:
2784 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002785
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002786 def get_data(self):
2787 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002788
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002789 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002790 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002791 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002792
2793 def handle_error(self):
2794 raise
2795
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002796 def __init__(self, address):
2797 threading.Thread.__init__(self)
2798 asyncore.dispatcher.__init__(self)
2799 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2800 self.bind(address)
2801 self.listen(5)
2802 self.host, self.port = self.socket.getsockname()[:2]
2803 self.handler_instance = None
2804 self._active = False
2805 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002806
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002807 # --- public API
2808
2809 @property
2810 def running(self):
2811 return self._active
2812
2813 def start(self):
2814 assert not self.running
2815 self.__flag = threading.Event()
2816 threading.Thread.start(self)
2817 self.__flag.wait()
2818
2819 def stop(self):
2820 assert self.running
2821 self._active = False
2822 self.join()
2823
2824 def wait(self):
2825 # wait for handler connection to be closed, then stop the server
2826 while not getattr(self.handler_instance, "closed", False):
2827 time.sleep(0.001)
2828 self.stop()
2829
2830 # --- internals
2831
2832 def run(self):
2833 self._active = True
2834 self.__flag.set()
2835 while self._active and asyncore.socket_map:
2836 self._active_lock.acquire()
2837 asyncore.loop(timeout=0.001, count=1)
2838 self._active_lock.release()
2839 asyncore.close_all()
2840
2841 def handle_accept(self):
2842 conn, addr = self.accept()
2843 self.handler_instance = self.Handler(conn)
2844
2845 def handle_connect(self):
2846 self.close()
2847 handle_read = handle_connect
2848
2849 def writable(self):
2850 return 0
2851
2852 def handle_error(self):
2853 raise
2854
2855
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002856@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2857class TestSendfile(unittest.TestCase):
2858
Victor Stinner8c663fd2017-11-08 14:44:44 -08002859 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002860 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002861 not sys.platform.startswith("solaris") and \
2862 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002863 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2864 'requires headers and trailers support')
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002865 requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
2866 'test is only meaningful on 32-bit builds')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002867
2868 @classmethod
2869 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002870 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002871 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002872
2873 @classmethod
2874 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002875 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002876 support.unlink(support.TESTFN)
2877
2878 def setUp(self):
2879 self.server = SendfileTestServer((support.HOST, 0))
2880 self.server.start()
2881 self.client = socket.socket()
2882 self.client.connect((self.server.host, self.server.port))
2883 self.client.settimeout(1)
2884 # synchronize by waiting for "220 ready" response
2885 self.client.recv(1024)
2886 self.sockno = self.client.fileno()
2887 self.file = open(support.TESTFN, 'rb')
2888 self.fileno = self.file.fileno()
2889
2890 def tearDown(self):
2891 self.file.close()
2892 self.client.close()
2893 if self.server.running:
2894 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002895 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002896
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002897 def sendfile_wrapper(self, *args, **kwargs):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002898 """A higher level wrapper representing how an application is
2899 supposed to use sendfile().
2900 """
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002901 while True:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002902 try:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002903 return os.sendfile(*args, **kwargs)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002904 except OSError as err:
2905 if err.errno == errno.ECONNRESET:
2906 # disconnected
2907 raise
2908 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2909 # we have to retry send data
2910 continue
2911 else:
2912 raise
2913
2914 def test_send_whole_file(self):
2915 # normal send
2916 total_sent = 0
2917 offset = 0
2918 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002919 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002920 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2921 if sent == 0:
2922 break
2923 offset += sent
2924 total_sent += sent
2925 self.assertTrue(sent <= nbytes)
2926 self.assertEqual(offset, total_sent)
2927
2928 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002929 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002930 self.client.close()
2931 self.server.wait()
2932 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002933 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002934 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002935
2936 def test_send_at_certain_offset(self):
2937 # start sending a file at a certain offset
2938 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002939 offset = len(self.DATA) // 2
2940 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002941 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002942 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002943 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2944 if sent == 0:
2945 break
2946 offset += sent
2947 total_sent += sent
2948 self.assertTrue(sent <= nbytes)
2949
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002950 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002951 self.client.close()
2952 self.server.wait()
2953 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002954 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002955 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002956 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002957 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002958
2959 def test_offset_overflow(self):
2960 # specify an offset > file size
2961 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002962 try:
2963 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2964 except OSError as e:
2965 # Solaris can raise EINVAL if offset >= file length, ignore.
2966 if e.errno != errno.EINVAL:
2967 raise
2968 else:
2969 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002970 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002971 self.client.close()
2972 self.server.wait()
2973 data = self.server.handler_instance.get_data()
2974 self.assertEqual(data, b'')
2975
2976 def test_invalid_offset(self):
2977 with self.assertRaises(OSError) as cm:
2978 os.sendfile(self.sockno, self.fileno, -1, 4096)
2979 self.assertEqual(cm.exception.errno, errno.EINVAL)
2980
Martin Panterbf19d162015-09-09 01:01:13 +00002981 def test_keywords(self):
2982 # Keyword arguments should be supported
2983 os.sendfile(out=self.sockno, offset=0, count=4096,
2984 **{'in': self.fileno})
2985 if self.SUPPORT_HEADERS_TRAILERS:
2986 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002987 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002988
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002989 # --- headers / trailers tests
2990
Serhiy Storchaka43767632013-11-03 21:31:38 +02002991 @requires_headers_trailers
2992 def test_headers(self):
2993 total_sent = 0
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002994 expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
Serhiy Storchaka43767632013-11-03 21:31:38 +02002995 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
Serhiy Storchaka9d572732018-07-31 10:24:54 +03002996 headers=[b"x" * 512, b"y" * 256])
2997 self.assertLessEqual(sent, 512 + 256 + 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02002998 total_sent += sent
2999 offset = 4096
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003000 while total_sent < len(expected_data):
3001 nbytes = min(len(expected_data) - total_sent, 4096)
Serhiy Storchaka43767632013-11-03 21:31:38 +02003002 sent = self.sendfile_wrapper(self.sockno, self.fileno,
3003 offset, nbytes)
3004 if sent == 0:
3005 break
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003006 self.assertLessEqual(sent, nbytes)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003007 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02003008 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003009
Serhiy Storchaka43767632013-11-03 21:31:38 +02003010 self.assertEqual(total_sent, len(expected_data))
3011 self.client.close()
3012 self.server.wait()
3013 data = self.server.handler_instance.get_data()
3014 self.assertEqual(hash(data), hash(expected_data))
3015
3016 @requires_headers_trailers
3017 def test_trailers(self):
3018 TESTFN2 = support.TESTFN + "2"
3019 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01003020
3021 self.addCleanup(support.unlink, TESTFN2)
3022 create_file(TESTFN2, file_data)
3023
3024 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003025 os.sendfile(self.sockno, f.fileno(), 0, 5,
3026 trailers=[b"123456", b"789"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003027 self.client.close()
3028 self.server.wait()
3029 data = self.server.handler_instance.get_data()
Serhiy Storchaka9d572732018-07-31 10:24:54 +03003030 self.assertEqual(data, b"abcde123456789")
3031
3032 @requires_headers_trailers
3033 @requires_32b
3034 def test_headers_overflow_32bits(self):
3035 self.server.handler_instance.accumulate = False
3036 with self.assertRaises(OSError) as cm:
3037 os.sendfile(self.sockno, self.fileno, 0, 0,
3038 headers=[b"x" * 2**16] * 2**15)
3039 self.assertEqual(cm.exception.errno, errno.EINVAL)
3040
3041 @requires_headers_trailers
3042 @requires_32b
3043 def test_trailers_overflow_32bits(self):
3044 self.server.handler_instance.accumulate = False
3045 with self.assertRaises(OSError) as cm:
3046 os.sendfile(self.sockno, self.fileno, 0, 0,
3047 trailers=[b"x" * 2**16] * 2**15)
3048 self.assertEqual(cm.exception.errno, errno.EINVAL)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003049
Serhiy Storchaka43767632013-11-03 21:31:38 +02003050 @requires_headers_trailers
3051 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
3052 'test needs os.SF_NODISKIO')
3053 def test_flags(self):
3054 try:
3055 os.sendfile(self.sockno, self.fileno, 0, 4096,
3056 flags=os.SF_NODISKIO)
3057 except OSError as err:
3058 if err.errno not in (errno.EBUSY, errno.EAGAIN):
3059 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00003060
3061
Larry Hastings9cf065c2012-06-22 16:30:09 -07003062def supports_extended_attributes():
3063 if not hasattr(os, "setxattr"):
3064 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01003065
Larry Hastings9cf065c2012-06-22 16:30:09 -07003066 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01003067 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003068 try:
3069 os.setxattr(fp.fileno(), b"user.test", b"")
3070 except OSError:
3071 return False
3072 finally:
3073 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003074
3075 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07003076
3077
3078@unittest.skipUnless(supports_extended_attributes(),
3079 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01003080# Kernels < 2.6.39 don't respect setxattr flags.
3081@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003082class ExtendedAttributeTests(unittest.TestCase):
3083
Larry Hastings9cf065c2012-06-22 16:30:09 -07003084 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04003085 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01003086 self.addCleanup(support.unlink, fn)
3087 create_file(fn)
3088
Benjamin Peterson799bd802011-08-31 22:15:17 -04003089 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003090 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003091 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003092
Victor Stinnerf12e5062011-10-16 22:12:03 +02003093 init_xattr = listxattr(fn)
3094 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01003095
Larry Hastings9cf065c2012-06-22 16:30:09 -07003096 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003097 xattr = set(init_xattr)
3098 xattr.add("user.test")
3099 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003100 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
3101 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
3102 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01003103
Benjamin Peterson799bd802011-08-31 22:15:17 -04003104 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003105 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003106 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01003107
Benjamin Peterson799bd802011-08-31 22:15:17 -04003108 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003109 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003110 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003111
Larry Hastings9cf065c2012-06-22 16:30:09 -07003112 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003113 xattr.add("user.test2")
3114 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003115 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01003116
Benjamin Peterson799bd802011-08-31 22:15:17 -04003117 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003118 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003119 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01003120
Victor Stinnerf12e5062011-10-16 22:12:03 +02003121 xattr.remove("user.test")
3122 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07003123 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
3124 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
3125 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
3126 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003127 many = sorted("user.test{}".format(i) for i in range(100))
3128 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003129 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02003130 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04003131
Larry Hastings9cf065c2012-06-22 16:30:09 -07003132 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003133 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003134 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01003135
3136 self._check_xattrs_str(os.fsencode, *args, **kwargs)
3137 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003138
3139 def test_simple(self):
3140 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3141 os.listxattr)
3142
3143 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07003144 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
3145 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003146
3147 def test_fds(self):
3148 def getxattr(path, *args):
3149 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003150 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003151 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003152 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003153 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003154 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01003155 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003156 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003157 def listxattr(path, *args):
3158 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07003159 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04003160 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
3161
3162
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003163@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
3164class TermsizeTests(unittest.TestCase):
3165 def test_does_not_crash(self):
3166 """Check if get_terminal_size() returns a meaningful value.
3167
3168 There's no easy portable way to actually check the size of the
3169 terminal, so let's check if it returns something sensible instead.
3170 """
3171 try:
3172 size = os.get_terminal_size()
3173 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003174 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003175 # Under win32 a generic OSError can be thrown if the
3176 # handle cannot be retrieved
3177 self.skipTest("failed to query terminal size")
3178 raise
3179
Antoine Pitroucfade362012-02-08 23:48:59 +01003180 self.assertGreaterEqual(size.columns, 0)
3181 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003182
3183 def test_stty_match(self):
3184 """Check if stty returns the same results
3185
3186 stty actually tests stdin, so get_terminal_size is invoked on
3187 stdin explicitly. If stty succeeded, then get_terminal_size()
3188 should work too.
3189 """
3190 try:
3191 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01003192 except (FileNotFoundError, subprocess.CalledProcessError,
3193 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003194 self.skipTest("stty invocation failed")
3195 expected = (int(size[1]), int(size[0])) # reversed order
3196
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01003197 try:
3198 actual = os.get_terminal_size(sys.__stdin__.fileno())
3199 except OSError as e:
3200 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
3201 # Under win32 a generic OSError can be thrown if the
3202 # handle cannot be retrieved
3203 self.skipTest("failed to query terminal size")
3204 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01003205 self.assertEqual(expected, actual)
3206
3207
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003208@unittest.skipUnless(hasattr(os, 'memfd_create'), 'requires os.memfd_create')
Christian Heimes6eb814b2019-05-30 11:27:06 +02003209@support.requires_linux_version(3, 17)
Zackery Spytz43fdbd22019-05-29 13:57:07 -06003210class MemfdCreateTests(unittest.TestCase):
3211 def test_memfd_create(self):
3212 fd = os.memfd_create("Hi", os.MFD_CLOEXEC)
3213 self.assertNotEqual(fd, -1)
3214 self.addCleanup(os.close, fd)
3215 self.assertFalse(os.get_inheritable(fd))
3216 with open(fd, "wb", closefd=False) as f:
3217 f.write(b'memfd_create')
3218 self.assertEqual(f.tell(), 12)
3219
3220 fd2 = os.memfd_create("Hi")
3221 self.addCleanup(os.close, fd2)
3222 self.assertFalse(os.get_inheritable(fd2))
3223
3224
Victor Stinner292c8352012-10-30 02:17:38 +01003225class OSErrorTests(unittest.TestCase):
3226 def setUp(self):
3227 class Str(str):
3228 pass
3229
Victor Stinnerafe17062012-10-31 22:47:43 +01003230 self.bytes_filenames = []
3231 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01003232 if support.TESTFN_UNENCODABLE is not None:
3233 decoded = support.TESTFN_UNENCODABLE
3234 else:
3235 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01003236 self.unicode_filenames.append(decoded)
3237 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01003238 if support.TESTFN_UNDECODABLE is not None:
3239 encoded = support.TESTFN_UNDECODABLE
3240 else:
3241 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01003242 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003243 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01003244 self.bytes_filenames.append(memoryview(encoded))
3245
3246 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01003247
3248 def test_oserror_filename(self):
3249 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01003250 (self.filenames, os.chdir,),
3251 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01003252 (self.filenames, os.lstat,),
3253 (self.filenames, os.open, os.O_RDONLY),
3254 (self.filenames, os.rmdir,),
3255 (self.filenames, os.stat,),
3256 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01003257 ]
3258 if sys.platform == "win32":
3259 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003260 (self.bytes_filenames, os.rename, b"dst"),
3261 (self.bytes_filenames, os.replace, b"dst"),
3262 (self.unicode_filenames, os.rename, "dst"),
3263 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07003264 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01003265 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01003266 else:
3267 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01003268 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01003269 (self.filenames, os.rename, "dst"),
3270 (self.filenames, os.replace, "dst"),
3271 ))
3272 if hasattr(os, "chown"):
3273 funcs.append((self.filenames, os.chown, 0, 0))
3274 if hasattr(os, "lchown"):
3275 funcs.append((self.filenames, os.lchown, 0, 0))
3276 if hasattr(os, "truncate"):
3277 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003278 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01003279 funcs.append((self.filenames, os.chflags, 0))
3280 if hasattr(os, "lchflags"):
3281 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01003282 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003283 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01003284 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003285 if sys.platform == "win32":
3286 funcs.append((self.bytes_filenames, os.link, b"dst"))
3287 funcs.append((self.unicode_filenames, os.link, "dst"))
3288 else:
3289 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003290 if hasattr(os, "listxattr"):
3291 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003292 (self.filenames, os.listxattr,),
3293 (self.filenames, os.getxattr, "user.test"),
3294 (self.filenames, os.setxattr, "user.test", b'user'),
3295 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003296 ))
3297 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003298 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003299 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003300 if sys.platform == "win32":
3301 funcs.append((self.unicode_filenames, os.readlink,))
3302 else:
3303 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003304
Steve Dowercc16be82016-09-08 10:35:16 -07003305
Victor Stinnerafe17062012-10-31 22:47:43 +01003306 for filenames, func, *func_args in funcs:
3307 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003308 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003309 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003310 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003311 else:
3312 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3313 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003314 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003315 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003316 except UnicodeDecodeError:
3317 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003318 else:
3319 self.fail("No exception thrown by {}".format(func))
3320
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003321class CPUCountTests(unittest.TestCase):
3322 def test_cpu_count(self):
3323 cpus = os.cpu_count()
3324 if cpus is not None:
3325 self.assertIsInstance(cpus, int)
3326 self.assertGreater(cpus, 0)
3327 else:
3328 self.skipTest("Could not determine the number of CPUs")
3329
Victor Stinnerdaf45552013-08-28 00:53:59 +02003330
3331class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003332 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003333 fd = os.open(__file__, os.O_RDONLY)
3334 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003335 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003336
Victor Stinnerdaf45552013-08-28 00:53:59 +02003337 os.set_inheritable(fd, True)
3338 self.assertEqual(os.get_inheritable(fd), True)
3339
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003340 @unittest.skipIf(fcntl is None, "need fcntl")
3341 def test_get_inheritable_cloexec(self):
3342 fd = os.open(__file__, os.O_RDONLY)
3343 self.addCleanup(os.close, fd)
3344 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003345
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003346 # clear FD_CLOEXEC flag
3347 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3348 flags &= ~fcntl.FD_CLOEXEC
3349 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003350
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003351 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003352
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003353 @unittest.skipIf(fcntl is None, "need fcntl")
3354 def test_set_inheritable_cloexec(self):
3355 fd = os.open(__file__, os.O_RDONLY)
3356 self.addCleanup(os.close, fd)
3357 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3358 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003359
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003360 os.set_inheritable(fd, True)
3361 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3362 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003363
Victor Stinnerdaf45552013-08-28 00:53:59 +02003364 def test_open(self):
3365 fd = os.open(__file__, os.O_RDONLY)
3366 self.addCleanup(os.close, fd)
3367 self.assertEqual(os.get_inheritable(fd), False)
3368
3369 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3370 def test_pipe(self):
3371 rfd, wfd = os.pipe()
3372 self.addCleanup(os.close, rfd)
3373 self.addCleanup(os.close, wfd)
3374 self.assertEqual(os.get_inheritable(rfd), False)
3375 self.assertEqual(os.get_inheritable(wfd), False)
3376
3377 def test_dup(self):
3378 fd1 = os.open(__file__, os.O_RDONLY)
3379 self.addCleanup(os.close, fd1)
3380
3381 fd2 = os.dup(fd1)
3382 self.addCleanup(os.close, fd2)
3383 self.assertEqual(os.get_inheritable(fd2), False)
3384
Miss Islington (bot)693945d2019-06-17 01:45:26 -07003385 @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test')
3386 def test_dup_nul(self):
3387 # os.dup() was creating inheritable fds for character files.
3388 fd1 = os.open('NUL', os.O_RDONLY)
3389 self.addCleanup(os.close, fd1)
3390 fd2 = os.dup(fd1)
3391 self.addCleanup(os.close, fd2)
3392 self.assertFalse(os.get_inheritable(fd2))
3393
Victor Stinnerdaf45552013-08-28 00:53:59 +02003394 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3395 def test_dup2(self):
3396 fd = os.open(__file__, os.O_RDONLY)
3397 self.addCleanup(os.close, fd)
3398
3399 # inheritable by default
3400 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003401 self.addCleanup(os.close, fd2)
3402 self.assertEqual(os.dup2(fd, fd2), fd2)
3403 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003404
3405 # force non-inheritable
3406 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003407 self.addCleanup(os.close, fd3)
3408 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3409 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003410
3411 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3412 def test_openpty(self):
3413 master_fd, slave_fd = os.openpty()
3414 self.addCleanup(os.close, master_fd)
3415 self.addCleanup(os.close, slave_fd)
3416 self.assertEqual(os.get_inheritable(master_fd), False)
3417 self.assertEqual(os.get_inheritable(slave_fd), False)
3418
3419
Brett Cannon3f9183b2016-08-26 14:44:48 -07003420class PathTConverterTests(unittest.TestCase):
3421 # tuples of (function name, allows fd arguments, additional arguments to
3422 # function, cleanup function)
3423 functions = [
3424 ('stat', True, (), None),
3425 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003426 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003427 ('chflags', False, (0,), None),
3428 ('lchflags', False, (0,), None),
3429 ('open', False, (0,), getattr(os, 'close', None)),
3430 ]
3431
3432 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003433 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003434 if os.name == 'nt':
3435 bytes_fspath = bytes_filename = None
3436 else:
3437 bytes_filename = support.TESTFN.encode('ascii')
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003438 bytes_fspath = FakePath(bytes_filename)
3439 fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003440 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003441 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003442
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003443 int_fspath = FakePath(fd)
3444 str_fspath = FakePath(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003445
3446 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3447 with self.subTest(name=name):
3448 try:
3449 fn = getattr(os, name)
3450 except AttributeError:
3451 continue
3452
Brett Cannon8f96a302016-08-26 19:30:11 -07003453 for path in (str_filename, bytes_filename, str_fspath,
3454 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003455 if path is None:
3456 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003457 with self.subTest(name=name, path=path):
3458 result = fn(path, *extra_args)
3459 if cleanup_fn is not None:
3460 cleanup_fn(result)
3461
3462 with self.assertRaisesRegex(
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003463 TypeError, 'to return str or bytes'):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003464 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003465
3466 if allow_fd:
3467 result = fn(fd, *extra_args) # should not fail
3468 if cleanup_fn is not None:
3469 cleanup_fn(result)
3470 else:
3471 with self.assertRaisesRegex(
3472 TypeError,
3473 'os.PathLike'):
3474 fn(fd, *extra_args)
3475
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003476 def test_path_t_converter_and_custom_class(self):
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003477 msg = r'__fspath__\(\) to return str or bytes, not %s'
3478 with self.assertRaisesRegex(TypeError, msg % r'int'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003479 os.stat(FakePath(2))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003480 with self.assertRaisesRegex(TypeError, msg % r'float'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003481 os.stat(FakePath(2.34))
Serhiy Storchaka8d01eb42019-02-19 13:52:35 +02003482 with self.assertRaisesRegex(TypeError, msg % r'object'):
Pablo Galindo09fbcd62019-02-18 10:46:34 +00003483 os.stat(FakePath(object()))
3484
Brett Cannon3f9183b2016-08-26 14:44:48 -07003485
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003486@unittest.skipUnless(hasattr(os, 'get_blocking'),
3487 'needs os.get_blocking() and os.set_blocking()')
3488class BlockingTests(unittest.TestCase):
3489 def test_blocking(self):
3490 fd = os.open(__file__, os.O_RDONLY)
3491 self.addCleanup(os.close, fd)
3492 self.assertEqual(os.get_blocking(fd), True)
3493
3494 os.set_blocking(fd, False)
3495 self.assertEqual(os.get_blocking(fd), False)
3496
3497 os.set_blocking(fd, True)
3498 self.assertEqual(os.get_blocking(fd), True)
3499
3500
Yury Selivanov97e2e062014-09-26 12:33:06 -04003501
3502class ExportsTests(unittest.TestCase):
3503 def test_os_all(self):
3504 self.assertIn('open', os.__all__)
3505 self.assertIn('walk', os.__all__)
3506
3507
Victor Stinner6036e442015-03-08 01:58:04 +01003508class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003509 check_no_resource_warning = support.check_no_resource_warning
3510
Victor Stinner6036e442015-03-08 01:58:04 +01003511 def setUp(self):
3512 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003513 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003514 self.addCleanup(support.rmtree, self.path)
3515 os.mkdir(self.path)
3516
3517 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003518 path = self.bytes_path if isinstance(name, bytes) else self.path
3519 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003520 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003521 return filename
3522
3523 def get_entries(self, names):
3524 entries = dict((entry.name, entry)
3525 for entry in os.scandir(self.path))
3526 self.assertEqual(sorted(entries.keys()), names)
3527 return entries
3528
3529 def assert_stat_equal(self, stat1, stat2, skip_fields):
3530 if skip_fields:
3531 for attr in dir(stat1):
3532 if not attr.startswith("st_"):
3533 continue
3534 if attr in ("st_dev", "st_ino", "st_nlink"):
3535 continue
3536 self.assertEqual(getattr(stat1, attr),
3537 getattr(stat2, attr),
3538 (stat1, stat2, attr))
3539 else:
3540 self.assertEqual(stat1, stat2)
3541
3542 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003543 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003544 self.assertEqual(entry.name, name)
3545 self.assertEqual(entry.path, os.path.join(self.path, name))
3546 self.assertEqual(entry.inode(),
3547 os.stat(entry.path, follow_symlinks=False).st_ino)
3548
3549 entry_stat = os.stat(entry.path)
3550 self.assertEqual(entry.is_dir(),
3551 stat.S_ISDIR(entry_stat.st_mode))
3552 self.assertEqual(entry.is_file(),
3553 stat.S_ISREG(entry_stat.st_mode))
3554 self.assertEqual(entry.is_symlink(),
3555 os.path.islink(entry.path))
3556
3557 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3558 self.assertEqual(entry.is_dir(follow_symlinks=False),
3559 stat.S_ISDIR(entry_lstat.st_mode))
3560 self.assertEqual(entry.is_file(follow_symlinks=False),
3561 stat.S_ISREG(entry_lstat.st_mode))
3562
3563 self.assert_stat_equal(entry.stat(),
3564 entry_stat,
3565 os.name == 'nt' and not is_symlink)
3566 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3567 entry_lstat,
3568 os.name == 'nt')
3569
3570 def test_attributes(self):
3571 link = hasattr(os, 'link')
3572 symlink = support.can_symlink()
3573
3574 dirname = os.path.join(self.path, "dir")
3575 os.mkdir(dirname)
3576 filename = self.create_file("file.txt")
3577 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003578 try:
3579 os.link(filename, os.path.join(self.path, "link_file.txt"))
3580 except PermissionError as e:
3581 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003582 if symlink:
3583 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3584 target_is_directory=True)
3585 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3586
3587 names = ['dir', 'file.txt']
3588 if link:
3589 names.append('link_file.txt')
3590 if symlink:
3591 names.extend(('symlink_dir', 'symlink_file.txt'))
3592 entries = self.get_entries(names)
3593
3594 entry = entries['dir']
3595 self.check_entry(entry, 'dir', True, False, False)
3596
3597 entry = entries['file.txt']
3598 self.check_entry(entry, 'file.txt', False, True, False)
3599
3600 if link:
3601 entry = entries['link_file.txt']
3602 self.check_entry(entry, 'link_file.txt', False, True, False)
3603
3604 if symlink:
3605 entry = entries['symlink_dir']
3606 self.check_entry(entry, 'symlink_dir', True, False, True)
3607
3608 entry = entries['symlink_file.txt']
3609 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3610
3611 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003612 path = self.bytes_path if isinstance(name, bytes) else self.path
3613 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003614 self.assertEqual(len(entries), 1)
3615
3616 entry = entries[0]
3617 self.assertEqual(entry.name, name)
3618 return entry
3619
Brett Cannon96881cd2016-06-10 14:37:21 -07003620 def create_file_entry(self, name='file.txt'):
3621 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003622 return self.get_entry(os.path.basename(filename))
3623
3624 def test_current_directory(self):
3625 filename = self.create_file()
3626 old_dir = os.getcwd()
3627 try:
3628 os.chdir(self.path)
3629
3630 # call scandir() without parameter: it must list the content
3631 # of the current directory
3632 entries = dict((entry.name, entry) for entry in os.scandir())
3633 self.assertEqual(sorted(entries.keys()),
3634 [os.path.basename(filename)])
3635 finally:
3636 os.chdir(old_dir)
3637
3638 def test_repr(self):
3639 entry = self.create_file_entry()
3640 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3641
Brett Cannon96881cd2016-06-10 14:37:21 -07003642 def test_fspath_protocol(self):
3643 entry = self.create_file_entry()
3644 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3645
3646 def test_fspath_protocol_bytes(self):
3647 bytes_filename = os.fsencode('bytesfile.txt')
3648 bytes_entry = self.create_file_entry(name=bytes_filename)
3649 fspath = os.fspath(bytes_entry)
3650 self.assertIsInstance(fspath, bytes)
3651 self.assertEqual(fspath,
3652 os.path.join(os.fsencode(self.path),bytes_filename))
3653
Victor Stinner6036e442015-03-08 01:58:04 +01003654 def test_removed_dir(self):
3655 path = os.path.join(self.path, 'dir')
3656
3657 os.mkdir(path)
3658 entry = self.get_entry('dir')
3659 os.rmdir(path)
3660
3661 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3662 if os.name == 'nt':
3663 self.assertTrue(entry.is_dir())
3664 self.assertFalse(entry.is_file())
3665 self.assertFalse(entry.is_symlink())
3666 if os.name == 'nt':
3667 self.assertRaises(FileNotFoundError, entry.inode)
3668 # don't fail
3669 entry.stat()
3670 entry.stat(follow_symlinks=False)
3671 else:
3672 self.assertGreater(entry.inode(), 0)
3673 self.assertRaises(FileNotFoundError, entry.stat)
3674 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3675
3676 def test_removed_file(self):
3677 entry = self.create_file_entry()
3678 os.unlink(entry.path)
3679
3680 self.assertFalse(entry.is_dir())
3681 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3682 if os.name == 'nt':
3683 self.assertTrue(entry.is_file())
3684 self.assertFalse(entry.is_symlink())
3685 if os.name == 'nt':
3686 self.assertRaises(FileNotFoundError, entry.inode)
3687 # don't fail
3688 entry.stat()
3689 entry.stat(follow_symlinks=False)
3690 else:
3691 self.assertGreater(entry.inode(), 0)
3692 self.assertRaises(FileNotFoundError, entry.stat)
3693 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3694
3695 def test_broken_symlink(self):
3696 if not support.can_symlink():
3697 return self.skipTest('cannot create symbolic link')
3698
3699 filename = self.create_file("file.txt")
3700 os.symlink(filename,
3701 os.path.join(self.path, "symlink.txt"))
3702 entries = self.get_entries(['file.txt', 'symlink.txt'])
3703 entry = entries['symlink.txt']
3704 os.unlink(filename)
3705
3706 self.assertGreater(entry.inode(), 0)
3707 self.assertFalse(entry.is_dir())
3708 self.assertFalse(entry.is_file()) # broken symlink returns False
3709 self.assertFalse(entry.is_dir(follow_symlinks=False))
3710 self.assertFalse(entry.is_file(follow_symlinks=False))
3711 self.assertTrue(entry.is_symlink())
3712 self.assertRaises(FileNotFoundError, entry.stat)
3713 # don't fail
3714 entry.stat(follow_symlinks=False)
3715
3716 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003717 self.create_file("file.txt")
3718
3719 path_bytes = os.fsencode(self.path)
3720 entries = list(os.scandir(path_bytes))
3721 self.assertEqual(len(entries), 1, entries)
3722 entry = entries[0]
3723
3724 self.assertEqual(entry.name, b'file.txt')
3725 self.assertEqual(entry.path,
3726 os.fsencode(os.path.join(self.path, 'file.txt')))
3727
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003728 def test_bytes_like(self):
3729 self.create_file("file.txt")
3730
3731 for cls in bytearray, memoryview:
3732 path_bytes = cls(os.fsencode(self.path))
3733 with self.assertWarns(DeprecationWarning):
3734 entries = list(os.scandir(path_bytes))
3735 self.assertEqual(len(entries), 1, entries)
3736 entry = entries[0]
3737
3738 self.assertEqual(entry.name, b'file.txt')
3739 self.assertEqual(entry.path,
3740 os.fsencode(os.path.join(self.path, 'file.txt')))
3741 self.assertIs(type(entry.name), bytes)
3742 self.assertIs(type(entry.path), bytes)
3743
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003744 @unittest.skipUnless(os.listdir in os.supports_fd,
3745 'fd support for listdir required for this test.')
3746 def test_fd(self):
3747 self.assertIn(os.scandir, os.supports_fd)
3748 self.create_file('file.txt')
3749 expected_names = ['file.txt']
3750 if support.can_symlink():
3751 os.symlink('file.txt', os.path.join(self.path, 'link'))
3752 expected_names.append('link')
3753
3754 fd = os.open(self.path, os.O_RDONLY)
3755 try:
3756 with os.scandir(fd) as it:
3757 entries = list(it)
3758 names = [entry.name for entry in entries]
3759 self.assertEqual(sorted(names), expected_names)
3760 self.assertEqual(names, os.listdir(fd))
3761 for entry in entries:
3762 self.assertEqual(entry.path, entry.name)
3763 self.assertEqual(os.fspath(entry), entry.name)
3764 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3765 if os.stat in os.supports_dir_fd:
3766 st = os.stat(entry.name, dir_fd=fd)
3767 self.assertEqual(entry.stat(), st)
3768 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3769 self.assertEqual(entry.stat(follow_symlinks=False), st)
3770 finally:
3771 os.close(fd)
3772
Victor Stinner6036e442015-03-08 01:58:04 +01003773 def test_empty_path(self):
3774 self.assertRaises(FileNotFoundError, os.scandir, '')
3775
3776 def test_consume_iterator_twice(self):
3777 self.create_file("file.txt")
3778 iterator = os.scandir(self.path)
3779
3780 entries = list(iterator)
3781 self.assertEqual(len(entries), 1, entries)
3782
3783 # check than consuming the iterator twice doesn't raise exception
3784 entries2 = list(iterator)
3785 self.assertEqual(len(entries2), 0, entries2)
3786
3787 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003788 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003789 self.assertRaises(TypeError, os.scandir, obj)
3790
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003791 def test_close(self):
3792 self.create_file("file.txt")
3793 self.create_file("file2.txt")
3794 iterator = os.scandir(self.path)
3795 next(iterator)
3796 iterator.close()
3797 # multiple closes
3798 iterator.close()
3799 with self.check_no_resource_warning():
3800 del iterator
3801
3802 def test_context_manager(self):
3803 self.create_file("file.txt")
3804 self.create_file("file2.txt")
3805 with os.scandir(self.path) as iterator:
3806 next(iterator)
3807 with self.check_no_resource_warning():
3808 del iterator
3809
3810 def test_context_manager_close(self):
3811 self.create_file("file.txt")
3812 self.create_file("file2.txt")
3813 with os.scandir(self.path) as iterator:
3814 next(iterator)
3815 iterator.close()
3816
3817 def test_context_manager_exception(self):
3818 self.create_file("file.txt")
3819 self.create_file("file2.txt")
3820 with self.assertRaises(ZeroDivisionError):
3821 with os.scandir(self.path) as iterator:
3822 next(iterator)
3823 1/0
3824 with self.check_no_resource_warning():
3825 del iterator
3826
3827 def test_resource_warning(self):
3828 self.create_file("file.txt")
3829 self.create_file("file2.txt")
3830 iterator = os.scandir(self.path)
3831 next(iterator)
3832 with self.assertWarns(ResourceWarning):
3833 del iterator
3834 support.gc_collect()
3835 # exhausted iterator
3836 iterator = os.scandir(self.path)
3837 list(iterator)
3838 with self.check_no_resource_warning():
3839 del iterator
3840
Victor Stinner6036e442015-03-08 01:58:04 +01003841
Ethan Furmancdc08792016-06-02 15:06:09 -07003842class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003843
3844 # Abstracted so it can be overridden to test pure Python implementation
3845 # if a C version is provided.
3846 fspath = staticmethod(os.fspath)
3847
Ethan Furmancdc08792016-06-02 15:06:09 -07003848 def test_return_bytes(self):
3849 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003850 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003851
3852 def test_return_string(self):
3853 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003854 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003855
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003856 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003857 for p in "path/like/object", b"path/like/object":
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003858 pathlike = FakePath(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003859
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003860 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003861 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3862 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3863
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003864 def test_pathlike(self):
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003865 self.assertEqual('#feelthegil', self.fspath(FakePath('#feelthegil')))
3866 self.assertTrue(issubclass(FakePath, os.PathLike))
3867 self.assertTrue(isinstance(FakePath('x'), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003868
Ethan Furmancdc08792016-06-02 15:06:09 -07003869 def test_garbage_in_exception_out(self):
3870 vapor = type('blah', (), {})
3871 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003872 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003873
3874 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003875 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003876
Brett Cannon044283a2016-07-15 10:41:49 -07003877 def test_bad_pathlike(self):
3878 # __fspath__ returns a value other than str or bytes.
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003879 self.assertRaises(TypeError, self.fspath, FakePath(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003880 # __fspath__ attribute that is not callable.
3881 c = type('foo', (), {})
3882 c.__fspath__ = 1
3883 self.assertRaises(TypeError, self.fspath, c())
3884 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003885 self.assertRaises(ZeroDivisionError, self.fspath,
Serhiy Storchakab21d1552018-03-02 11:53:51 +02003886 FakePath(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003887
Victor Stinnerc29b5852017-11-02 07:28:27 -07003888
3889class TimesTests(unittest.TestCase):
3890 def test_times(self):
3891 times = os.times()
3892 self.assertIsInstance(times, os.times_result)
3893
3894 for field in ('user', 'system', 'children_user', 'children_system',
3895 'elapsed'):
3896 value = getattr(times, field)
3897 self.assertIsInstance(value, float)
3898
3899 if os.name == 'nt':
3900 self.assertEqual(times.children_user, 0)
3901 self.assertEqual(times.children_system, 0)
3902 self.assertEqual(times.elapsed, 0)
3903
3904
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003905# Only test if the C version is provided, otherwise TestPEP519 already tested
3906# the pure Python implementation.
3907if hasattr(os, "_fspath"):
3908 class TestPEP519PurePython(TestPEP519):
3909
3910 """Explicitly test the pure Python implementation of os.fspath()."""
3911
3912 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003913
3914
Fred Drake2e2be372001-09-20 21:33:42 +00003915if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003916 unittest.main()