blob: 601c6b2e97c3cc14c8cffbd56d8fffc3251db73c [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000027try:
28 import threading
29except ImportError:
30 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020031try:
32 import resource
33except ImportError:
34 resource = None
35
Georg Brandl2daf6ae2012-02-20 19:54:16 +010036from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000037
Victor Stinner034d0aa2012-06-05 01:22:15 +020038with warnings.catch_warnings():
39 warnings.simplefilter("ignore", DeprecationWarning)
40 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010041st = os.stat(__file__)
42stat_supports_subsecond = (
43 # check if float and int timestamps are different
44 (st.st_atime != st[7])
45 or (st.st_mtime != st[8])
46 or (st.st_ctime != st[9]))
47
Mark Dickinson7cf03892010-04-16 13:45:35 +000048# Detect whether we're on a Linux system that uses the (now outdated
49# and unmaintained) linuxthreads threading library. There's an issue
50# when combining linuxthreads with a failed execv call: see
51# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020052if hasattr(sys, 'thread_info') and sys.thread_info.version:
53 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
54else:
55 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000056
Stefan Krahebee49a2013-01-17 15:31:00 +010057# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
58HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
59
Thomas Wouters0e3f5912006-08-11 14:57:12 +000060# Tests creating TESTFN
61class FileTests(unittest.TestCase):
62 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000063 if os.path.exists(support.TESTFN):
64 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000065 tearDown = setUp
66
67 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000068 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000070 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000071
Christian Heimesfdab48e2008-01-20 09:06:41 +000072 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000073 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
74 # We must allocate two consecutive file descriptors, otherwise
75 # it will mess up other file descriptors (perhaps even the three
76 # standard ones).
77 second = os.dup(first)
78 try:
79 retries = 0
80 while second != first + 1:
81 os.close(first)
82 retries += 1
83 if retries > 10:
84 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000085 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000086 first, second = second, os.dup(second)
87 finally:
88 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000089 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000090 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000091 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000092
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000093 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000094 def test_rename(self):
95 path = support.TESTFN
96 old = sys.getrefcount(path)
97 self.assertRaises(TypeError, os.rename, path, 0)
98 new = sys.getrefcount(path)
99 self.assertEqual(old, new)
100
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000101 def test_read(self):
102 with open(support.TESTFN, "w+b") as fobj:
103 fobj.write(b"spam")
104 fobj.flush()
105 fd = fobj.fileno()
106 os.lseek(fd, 0, 0)
107 s = os.read(fd, 4)
108 self.assertEqual(type(s), bytes)
109 self.assertEqual(s, b"spam")
110
111 def test_write(self):
112 # os.write() accepts bytes- and buffer-like objects but not strings
113 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
114 self.assertRaises(TypeError, os.write, fd, "beans")
115 os.write(fd, b"bacon\n")
116 os.write(fd, bytearray(b"eggs\n"))
117 os.write(fd, memoryview(b"spam\n"))
118 os.close(fd)
119 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000120 self.assertEqual(fobj.read().splitlines(),
121 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000122
Victor Stinnere0daff12011-03-20 23:36:35 +0100123 def write_windows_console(self, *args):
124 retcode = subprocess.call(args,
125 # use a new console to not flood the test output
126 creationflags=subprocess.CREATE_NEW_CONSOLE,
127 # use a shell to hide the console window (SW_HIDE)
128 shell=True)
129 self.assertEqual(retcode, 0)
130
131 @unittest.skipUnless(sys.platform == 'win32',
132 'test specific to the Windows console')
133 def test_write_windows_console(self):
134 # Issue #11395: the Windows console returns an error (12: not enough
135 # space error) on writing into stdout if stdout mode is binary and the
136 # length is greater than 66,000 bytes (or less, depending on heap
137 # usage).
138 code = "print('x' * 100000)"
139 self.write_windows_console(sys.executable, "-c", code)
140 self.write_windows_console(sys.executable, "-u", "-c", code)
141
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000142 def fdopen_helper(self, *args):
143 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200144 f = os.fdopen(fd, *args)
145 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000146
147 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200148 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
149 os.close(fd)
150
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000151 self.fdopen_helper()
152 self.fdopen_helper('r')
153 self.fdopen_helper('r', 100)
154
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100155 def test_replace(self):
156 TESTFN2 = support.TESTFN + ".2"
157 with open(support.TESTFN, 'w') as f:
158 f.write("1")
159 with open(TESTFN2, 'w') as f:
160 f.write("2")
161 self.addCleanup(os.unlink, TESTFN2)
162 os.replace(support.TESTFN, TESTFN2)
163 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
164 with open(TESTFN2, 'r') as f:
165 self.assertEqual(f.read(), "1")
166
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200167
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000168# Test attributes on return values from os.*stat* family.
169class StatAttributeTests(unittest.TestCase):
170 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000171 os.mkdir(support.TESTFN)
172 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000173 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000174 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000175 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000176
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000177 def tearDown(self):
178 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000179 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180
Serhiy Storchaka79080682013-11-03 21:31:18 +0200181 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000182 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000183 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000184
185 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000186 self.assertEqual(result[stat.ST_SIZE], 3)
187 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000188
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000189 # Make sure all the attributes are there
190 members = dir(result)
191 for name in dir(stat):
192 if name[:3] == 'ST_':
193 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000194 if name.endswith("TIME"):
195 def trunc(x): return int(x)
196 else:
197 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000198 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000199 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000200 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000201
Larry Hastings6fe20b32012-04-19 15:07:49 -0700202 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700203 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700204 for name in 'st_atime st_mtime st_ctime'.split():
205 floaty = int(getattr(result, name) * 100000)
206 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700207 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700208
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000209 try:
210 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200211 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000212 except IndexError:
213 pass
214
215 # Make sure that assignment fails
216 try:
217 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200218 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000219 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 pass
221
222 try:
223 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200224 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000225 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000226 pass
227
228 try:
229 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200230 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000231 except AttributeError:
232 pass
233
234 # Use the stat_result constructor with a too-short tuple.
235 try:
236 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200237 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000238 except TypeError:
239 pass
240
Ezio Melotti42da6632011-03-15 05:18:48 +0200241 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 try:
243 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
244 except TypeError:
245 pass
246
Antoine Pitrou38425292010-09-21 18:19:07 +0000247 def test_stat_attributes(self):
248 self.check_stat_attributes(self.fname)
249
250 def test_stat_attributes_bytes(self):
251 try:
252 fname = self.fname.encode(sys.getfilesystemencoding())
253 except UnicodeEncodeError:
254 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100255 with warnings.catch_warnings():
256 warnings.simplefilter("ignore", DeprecationWarning)
257 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000258
Serhiy Storchaka79080682013-11-03 21:31:18 +0200259 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000260 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000261 try:
262 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000263 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000264 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000265 if e.errno == errno.ENOSYS:
266 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267
268 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000271 # Make sure all the attributes are there.
272 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
273 'ffree', 'favail', 'flag', 'namemax')
274 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000275 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276
277 # Make sure that assignment really fails
278 try:
279 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200280 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000281 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282 pass
283
284 try:
285 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200286 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000287 except AttributeError:
288 pass
289
290 # Use the constructor with a too-short tuple.
291 try:
292 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200293 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000294 except TypeError:
295 pass
296
Ezio Melotti42da6632011-03-15 05:18:48 +0200297 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000298 try:
299 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
300 except TypeError:
301 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000302
Thomas Wouters89f507f2006-12-13 04:49:30 +0000303 def test_utime_dir(self):
304 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000305 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000306 # round to int, because some systems may support sub-second
307 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000308 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
309 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000310 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000311
Larry Hastings76ad59b2012-05-03 00:30:07 -0700312 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600313 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600314 # second argument. Check that the previous methods of passing
315 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700316 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600317 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700318 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
319 # Setting the time to the time you just read, then reading again,
320 # should always return exactly the same times.
321 st1 = os.stat(filename)
322 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
323 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600324 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700325 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600326 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700327 # Set to the current time in the new way
328 os.utime(filename)
329 st3 = os.stat(filename)
330 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
331
332 def test_utime(self):
333 def utime(file, times):
334 return os.utime(file, times)
335 self._test_utime(self.fname, getattr, utime, 10)
336 self._test_utime(support.TESTFN, getattr, utime, 10)
337
338
339 def _test_utime_ns(self, set_times_ns, test_dir=True):
340 def getattr_ns(o, attr):
341 return getattr(o, attr + "_ns")
342 ten_s = 10 * 1000 * 1000 * 1000
343 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
344 if test_dir:
345 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
346
347 def test_utime_ns(self):
348 def utime_ns(file, times):
349 return os.utime(file, ns=times)
350 self._test_utime_ns(utime_ns)
351
Larry Hastings9cf065c2012-06-22 16:30:09 -0700352 requires_utime_dir_fd = unittest.skipUnless(
353 os.utime in os.supports_dir_fd,
354 "dir_fd support for utime required for this test.")
355 requires_utime_fd = unittest.skipUnless(
356 os.utime in os.supports_fd,
357 "fd support for utime required for this test.")
358 requires_utime_nofollow_symlinks = unittest.skipUnless(
359 os.utime in os.supports_follow_symlinks,
360 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700361
Larry Hastings9cf065c2012-06-22 16:30:09 -0700362 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700363 def test_lutimes_ns(self):
364 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700365 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700366 self._test_utime_ns(lutimes_ns)
367
Larry Hastings9cf065c2012-06-22 16:30:09 -0700368 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700369 def test_futimes_ns(self):
370 def futimes_ns(file, times):
371 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700372 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700373 self._test_utime_ns(futimes_ns, test_dir=False)
374
375 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700376 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700377 getattr(os, name)(arg, (5, 5), ns=(5, 5))
378
379 def test_utime_invalid_arguments(self):
380 self._utime_invalid_arguments('utime', self.fname)
381
Brian Curtin52fbea12011-11-06 13:41:17 -0600382
Victor Stinner1aa54a42012-02-08 04:09:37 +0100383 @unittest.skipUnless(stat_supports_subsecond,
384 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100385 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100386 asec, amsec = 1, 901
387 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100388 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100389 mtime = msec + mmsec * 1e-3
390 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100391 os.utime(filename, (0, 0))
392 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200393 with warnings.catch_warnings():
394 warnings.simplefilter("ignore", DeprecationWarning)
395 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100396 st = os.stat(filename)
397 self.assertAlmostEqual(st.st_atime, atime, places=3)
398 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100399
Victor Stinnera2f7c002012-02-08 03:36:25 +0100400 def test_utime_subsecond(self):
401 def set_time(filename, atime, mtime):
402 os.utime(filename, (atime, mtime))
403 self._test_utime_subsecond(set_time)
404
Larry Hastings9cf065c2012-06-22 16:30:09 -0700405 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100406 def test_futimes_subsecond(self):
407 def set_time(filename, atime, mtime):
408 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700409 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100410 self._test_utime_subsecond(set_time)
411
Larry Hastings9cf065c2012-06-22 16:30:09 -0700412 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100413 def test_futimens_subsecond(self):
414 def set_time(filename, atime, mtime):
415 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700416 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100417 self._test_utime_subsecond(set_time)
418
Larry Hastings9cf065c2012-06-22 16:30:09 -0700419 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100420 def test_futimesat_subsecond(self):
421 def set_time(filename, atime, mtime):
422 dirname = os.path.dirname(filename)
423 dirfd = os.open(dirname, os.O_RDONLY)
424 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700425 os.utime(os.path.basename(filename), dir_fd=dirfd,
426 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100427 finally:
428 os.close(dirfd)
429 self._test_utime_subsecond(set_time)
430
Larry Hastings9cf065c2012-06-22 16:30:09 -0700431 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100432 def test_lutimes_subsecond(self):
433 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700434 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100435 self._test_utime_subsecond(set_time)
436
Larry Hastings9cf065c2012-06-22 16:30:09 -0700437 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100438 def test_utimensat_subsecond(self):
439 def set_time(filename, atime, mtime):
440 dirname = os.path.dirname(filename)
441 dirfd = os.open(dirname, os.O_RDONLY)
442 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700443 os.utime(os.path.basename(filename), dir_fd=dirfd,
444 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100445 finally:
446 os.close(dirfd)
447 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100448
Serhiy Storchaka79080682013-11-03 21:31:18 +0200449 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000450 # systems support centiseconds
Serhiy Storchaka79080682013-11-03 21:31:18 +0200451 def get_file_system(path):
452 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000453 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000454 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000455 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000456 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000457 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000458 return buf.value
459
Serhiy Storchaka79080682013-11-03 21:31:18 +0200460 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
461 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
462 "requires NTFS")
463 def test_1565150(self):
464 t1 = 1159195039.25
465 os.utime(self.fname, (t1, t1))
466 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000467
Serhiy Storchaka79080682013-11-03 21:31:18 +0200468 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
469 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
470 "requires NTFS")
471 def test_large_time(self):
472 t1 = 5000000000 # some day in 2128
473 os.utime(self.fname, (t1, t1))
474 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000475
Serhiy Storchaka79080682013-11-03 21:31:18 +0200476 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
477 def test_1686475(self):
478 # Verify that an open file can be stat'ed
479 try:
480 os.stat(r"c:\pagefile.sys")
481 except WindowsError as e:
482 if e.errno == 2: # file does not exist; cannot run test
483 return
484 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000485
Serhiy Storchaka79080682013-11-03 21:31:18 +0200486 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
487 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
488 def test_15261(self):
489 # Verify that stat'ing a closed fd does not cause crash
490 r, w = os.pipe()
491 try:
492 os.stat(r) # should not raise error
493 finally:
494 os.close(r)
495 os.close(w)
496 with self.assertRaises(OSError) as ctx:
497 os.stat(r)
498 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100499
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000500from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000501
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000502class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000503 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000504 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000505
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000506 def setUp(self):
507 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000508 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000509 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000510 for key, value in self._reference().items():
511 os.environ[key] = value
512
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000513 def tearDown(self):
514 os.environ.clear()
515 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000516 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000517 os.environb.clear()
518 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000519
Christian Heimes90333392007-11-01 19:08:42 +0000520 def _reference(self):
521 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
522
523 def _empty_mapping(self):
524 os.environ.clear()
525 return os.environ
526
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000527 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300528 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000529 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000530 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300531 os.environ.update(HELLO="World")
532 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
533 value = popen.read().strip()
534 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000535
Ezio Melottic7e139b2012-09-26 20:01:34 +0300536 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000537 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300538 with os.popen(
539 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
540 it = iter(popen)
541 self.assertEqual(next(it), "line1\n")
542 self.assertEqual(next(it), "line2\n")
543 self.assertEqual(next(it), "line3\n")
544 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000545
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000546 # Verify environ keys and values from the OS are of the
547 # correct str type.
548 def test_keyvalue_types(self):
549 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000550 self.assertEqual(type(key), str)
551 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000552
Christian Heimes90333392007-11-01 19:08:42 +0000553 def test_items(self):
554 for key, value in self._reference().items():
555 self.assertEqual(os.environ.get(key), value)
556
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000557 # Issue 7310
558 def test___repr__(self):
559 """Check that the repr() of os.environ looks like environ({...})."""
560 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000561 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
562 '{!r}: {!r}'.format(key, value)
563 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000564
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000565 def test_get_exec_path(self):
566 defpath_list = os.defpath.split(os.pathsep)
567 test_path = ['/monty', '/python', '', '/flying/circus']
568 test_env = {'PATH': os.pathsep.join(test_path)}
569
570 saved_environ = os.environ
571 try:
572 os.environ = dict(test_env)
573 # Test that defaulting to os.environ works.
574 self.assertSequenceEqual(test_path, os.get_exec_path())
575 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
576 finally:
577 os.environ = saved_environ
578
579 # No PATH environment variable
580 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
581 # Empty PATH environment variable
582 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
583 # Supplied PATH environment variable
584 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
585
Victor Stinnerb745a742010-05-18 17:17:23 +0000586 if os.supports_bytes_environ:
587 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000588 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000589 # ignore BytesWarning warning
590 with warnings.catch_warnings(record=True):
591 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000592 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000593 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000594 pass
595 else:
596 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000597
598 # bytes key and/or value
599 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
600 ['abc'])
601 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
602 ['abc'])
603 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
604 ['abc'])
605
606 @unittest.skipUnless(os.supports_bytes_environ,
607 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000608 def test_environb(self):
609 # os.environ -> os.environb
610 value = 'euro\u20ac'
611 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000612 value_bytes = value.encode(sys.getfilesystemencoding(),
613 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000614 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000615 msg = "U+20AC character is not encodable to %s" % (
616 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000617 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000618 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000619 self.assertEqual(os.environ['unicode'], value)
620 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000621
622 # os.environb -> os.environ
623 value = b'\xff'
624 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000625 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000626 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000627 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000628
Charles-François Natali2966f102011-11-26 11:32:46 +0100629 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
630 # #13415).
631 @support.requires_freebsd_version(7)
632 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100633 def test_unset_error(self):
634 if sys.platform == "win32":
635 # an environment variable is limited to 32,767 characters
636 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100637 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100638 else:
639 # "=" is not allowed in a variable name
640 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100641 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100642
Victor Stinner6d101392013-04-14 16:35:04 +0200643 def test_key_type(self):
644 missing = 'missingkey'
645 self.assertNotIn(missing, os.environ)
646
Victor Stinner839e5ea2013-04-14 16:43:03 +0200647 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200648 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200649 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200650 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200651
Victor Stinner839e5ea2013-04-14 16:43:03 +0200652 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200653 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200654 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200655 self.assertTrue(cm.exception.__suppress_context__)
656
Victor Stinner6d101392013-04-14 16:35:04 +0200657
Tim Petersc4e09402003-04-25 07:11:48 +0000658class WalkTests(unittest.TestCase):
659 """Tests for os.walk()."""
660
Charles-François Natali7372b062012-02-05 15:15:38 +0100661 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000662 import os
663 from os.path import join
664
665 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000666 # TESTFN/
667 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000668 # tmp1
669 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000670 # tmp2
671 # SUB11/ no kids
672 # SUB2/ a file kid and a dirsymlink kid
673 # tmp3
674 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200675 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000676 # TEST2/
677 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000678 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000679 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000680 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000681 sub2_path = join(walk_path, "SUB2")
682 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000683 tmp2_path = join(sub1_path, "tmp2")
684 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000685 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000686 t2_path = join(support.TESTFN, "TEST2")
687 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200688 link_path = join(sub2_path, "link")
689 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000690
691 # Create stuff.
692 os.makedirs(sub11_path)
693 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000694 os.makedirs(t2_path)
695 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000696 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000697 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
698 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000699 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400700 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400701 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200702 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000703 else:
704 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000705
706 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000707 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000708 self.assertEqual(len(all), 4)
709 # We can't know which order SUB1 and SUB2 will appear in.
710 # Not flipped: TESTFN, SUB1, SUB11, SUB2
711 # flipped: TESTFN, SUB2, SUB1, SUB11
712 flipped = all[0][1][0] != "SUB1"
713 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200714 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000715 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000716 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
717 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000718 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000719
720 # Prune the search.
721 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000722 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000723 all.append((root, dirs, files))
724 # Don't descend into SUB1.
725 if 'SUB1' in dirs:
726 # Note that this also mutates the dirs we appended to all!
727 dirs.remove('SUB1')
728 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000729 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200730 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000731 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000732
733 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000734 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000735 self.assertEqual(len(all), 4)
736 # We can't know which order SUB1 and SUB2 will appear in.
737 # Not flipped: SUB11, SUB1, SUB2, TESTFN
738 # flipped: SUB2, SUB11, SUB1, TESTFN
739 flipped = all[3][1][0] != "SUB1"
740 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200741 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000742 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000743 self.assertEqual(all[flipped], (sub11_path, [], []))
744 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000745 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000746
Brian Curtin3b4499c2010-12-28 14:31:47 +0000747 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000748 # Walk, following symlinks.
749 for root, dirs, files in os.walk(walk_path, followlinks=True):
750 if root == link_path:
751 self.assertEqual(dirs, [])
752 self.assertEqual(files, ["tmp4"])
753 break
754 else:
755 self.fail("Didn't follow symlink with followlinks=True")
756
757 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000758 # Tear everything down. This is a decent use for bottom-up on
759 # Windows, which doesn't have a recursive delete command. The
760 # (not so) subtlety is that rmdir will fail unless the dir's
761 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000762 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000763 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000764 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000765 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000766 dirname = os.path.join(root, name)
767 if not os.path.islink(dirname):
768 os.rmdir(dirname)
769 else:
770 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000771 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000772
Charles-François Natali7372b062012-02-05 15:15:38 +0100773
774@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
775class FwalkTests(WalkTests):
776 """Tests for os.fwalk()."""
777
Larry Hastingsc48fe982012-06-25 04:49:05 -0700778 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
779 """
780 compare with walk() results.
781 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700782 walk_kwargs = walk_kwargs.copy()
783 fwalk_kwargs = fwalk_kwargs.copy()
784 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
785 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
786 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700787
Charles-François Natali7372b062012-02-05 15:15:38 +0100788 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700789 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100790 expected[root] = (set(dirs), set(files))
791
Larry Hastingsc48fe982012-06-25 04:49:05 -0700792 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100793 self.assertIn(root, expected)
794 self.assertEqual(expected[root], (set(dirs), set(files)))
795
Larry Hastingsc48fe982012-06-25 04:49:05 -0700796 def test_compare_to_walk(self):
797 kwargs = {'top': support.TESTFN}
798 self._compare_to_walk(kwargs, kwargs)
799
Charles-François Natali7372b062012-02-05 15:15:38 +0100800 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700801 try:
802 fd = os.open(".", os.O_RDONLY)
803 walk_kwargs = {'top': support.TESTFN}
804 fwalk_kwargs = walk_kwargs.copy()
805 fwalk_kwargs['dir_fd'] = fd
806 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
807 finally:
808 os.close(fd)
809
810 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100811 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700812 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
813 args = support.TESTFN, topdown, None
814 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100815 # check that the FD is valid
816 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700817 # redundant check
818 os.stat(rootfd)
819 # check that listdir() returns consistent information
820 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100821
822 def test_fd_leak(self):
823 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
824 # we both check that calling fwalk() a large number of times doesn't
825 # yield EMFILE, and that the minimum allocated FD hasn't changed.
826 minfd = os.dup(1)
827 os.close(minfd)
828 for i in range(256):
829 for x in os.fwalk(support.TESTFN):
830 pass
831 newfd = os.dup(1)
832 self.addCleanup(os.close, newfd)
833 self.assertEqual(newfd, minfd)
834
835 def tearDown(self):
836 # cleanup
837 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
838 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700839 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100840 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700841 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700842 if stat.S_ISDIR(st.st_mode):
843 os.rmdir(name, dir_fd=rootfd)
844 else:
845 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100846 os.rmdir(support.TESTFN)
847
848
Guido van Rossume7ba4952007-06-06 23:52:48 +0000849class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000850 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000851 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000852
853 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000854 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000855 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
856 os.makedirs(path) # Should work
857 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
858 os.makedirs(path)
859
860 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000861 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000862 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
863 os.makedirs(path)
864 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
865 'dir5', 'dir6')
866 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000867
Terry Reedy5a22b652010-12-02 07:05:56 +0000868 def test_exist_ok_existing_directory(self):
869 path = os.path.join(support.TESTFN, 'dir1')
870 mode = 0o777
871 old_mask = os.umask(0o022)
872 os.makedirs(path, mode)
873 self.assertRaises(OSError, os.makedirs, path, mode)
874 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
875 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
876 os.makedirs(path, mode=mode, exist_ok=True)
877 os.umask(old_mask)
878
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700879 def test_exist_ok_s_isgid_directory(self):
880 path = os.path.join(support.TESTFN, 'dir1')
881 S_ISGID = stat.S_ISGID
882 mode = 0o777
883 old_mask = os.umask(0o022)
884 try:
885 existing_testfn_mode = stat.S_IMODE(
886 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700887 try:
888 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700889 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700890 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700891 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
892 raise unittest.SkipTest('No support for S_ISGID dir mode.')
893 # The os should apply S_ISGID from the parent dir for us, but
894 # this test need not depend on that behavior. Be explicit.
895 os.makedirs(path, mode | S_ISGID)
896 # http://bugs.python.org/issue14992
897 # Should not fail when the bit is already set.
898 os.makedirs(path, mode, exist_ok=True)
899 # remove the bit.
900 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
901 with self.assertRaises(OSError):
902 # Should fail when the bit is not already set when demanded.
903 os.makedirs(path, mode | S_ISGID, exist_ok=True)
904 finally:
905 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000906
907 def test_exist_ok_existing_regular_file(self):
908 base = support.TESTFN
909 path = os.path.join(support.TESTFN, 'dir1')
910 f = open(path, 'w')
911 f.write('abc')
912 f.close()
913 self.assertRaises(OSError, os.makedirs, path)
914 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
915 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
916 os.remove(path)
917
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000918 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000919 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000920 'dir4', 'dir5', 'dir6')
921 # If the tests failed, the bottom-most directory ('../dir6')
922 # may not have been created, so we look for the outermost directory
923 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000924 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000925 path = os.path.dirname(path)
926
927 os.removedirs(path)
928
Andrew Svetlov405faed2012-12-25 12:18:09 +0200929
930class RemoveDirsTests(unittest.TestCase):
931 def setUp(self):
932 os.makedirs(support.TESTFN)
933
934 def tearDown(self):
935 support.rmtree(support.TESTFN)
936
937 def test_remove_all(self):
938 dira = os.path.join(support.TESTFN, 'dira')
939 os.mkdir(dira)
940 dirb = os.path.join(dira, 'dirb')
941 os.mkdir(dirb)
942 os.removedirs(dirb)
943 self.assertFalse(os.path.exists(dirb))
944 self.assertFalse(os.path.exists(dira))
945 self.assertFalse(os.path.exists(support.TESTFN))
946
947 def test_remove_partial(self):
948 dira = os.path.join(support.TESTFN, 'dira')
949 os.mkdir(dira)
950 dirb = os.path.join(dira, 'dirb')
951 os.mkdir(dirb)
952 with open(os.path.join(dira, 'file.txt'), 'w') as f:
953 f.write('text')
954 os.removedirs(dirb)
955 self.assertFalse(os.path.exists(dirb))
956 self.assertTrue(os.path.exists(dira))
957 self.assertTrue(os.path.exists(support.TESTFN))
958
959 def test_remove_nothing(self):
960 dira = os.path.join(support.TESTFN, 'dira')
961 os.mkdir(dira)
962 dirb = os.path.join(dira, 'dirb')
963 os.mkdir(dirb)
964 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
965 f.write('text')
966 with self.assertRaises(OSError):
967 os.removedirs(dirb)
968 self.assertTrue(os.path.exists(dirb))
969 self.assertTrue(os.path.exists(dira))
970 self.assertTrue(os.path.exists(support.TESTFN))
971
972
Guido van Rossume7ba4952007-06-06 23:52:48 +0000973class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000974 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200975 with open(os.devnull, 'wb') as f:
976 f.write(b'hello')
977 f.close()
978 with open(os.devnull, 'rb') as f:
979 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000980
Andrew Svetlov405faed2012-12-25 12:18:09 +0200981
Guido van Rossume7ba4952007-06-06 23:52:48 +0000982class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100983 def test_urandom_length(self):
984 self.assertEqual(len(os.urandom(0)), 0)
985 self.assertEqual(len(os.urandom(1)), 1)
986 self.assertEqual(len(os.urandom(10)), 10)
987 self.assertEqual(len(os.urandom(100)), 100)
988 self.assertEqual(len(os.urandom(1000)), 1000)
989
990 def test_urandom_value(self):
991 data1 = os.urandom(16)
992 data2 = os.urandom(16)
993 self.assertNotEqual(data1, data2)
994
995 def get_urandom_subprocess(self, count):
996 code = '\n'.join((
997 'import os, sys',
998 'data = os.urandom(%s)' % count,
999 'sys.stdout.buffer.write(data)',
1000 'sys.stdout.buffer.flush()'))
1001 out = assert_python_ok('-c', code)
1002 stdout = out[1]
1003 self.assertEqual(len(stdout), 16)
1004 return stdout
1005
1006 def test_urandom_subprocess(self):
1007 data1 = self.get_urandom_subprocess(16)
1008 data2 = self.get_urandom_subprocess(16)
1009 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001010
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001011 @unittest.skipUnless(resource, "test requires the resource module")
1012 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001013 # Check urandom() failing when it is not able to open /dev/random.
1014 # We spawn a new process to make the test more robust (if getrlimit()
1015 # failed to restore the file descriptor limit after this, the whole
1016 # test suite would crash; this actually happened on the OS X Tiger
1017 # buildbot).
1018 code = """if 1:
1019 import errno
1020 import os
1021 import resource
1022
1023 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1024 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1025 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001026 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001027 except OSError as e:
1028 assert e.errno == errno.EMFILE, e.errno
1029 else:
1030 raise AssertionError("OSError not raised")
1031 """
1032 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001033
1034
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001035@contextlib.contextmanager
1036def _execvpe_mockup(defpath=None):
1037 """
1038 Stubs out execv and execve functions when used as context manager.
1039 Records exec calls. The mock execv and execve functions always raise an
1040 exception as they would normally never return.
1041 """
1042 # A list of tuples containing (function name, first arg, args)
1043 # of calls to execv or execve that have been made.
1044 calls = []
1045
1046 def mock_execv(name, *args):
1047 calls.append(('execv', name, args))
1048 raise RuntimeError("execv called")
1049
1050 def mock_execve(name, *args):
1051 calls.append(('execve', name, args))
1052 raise OSError(errno.ENOTDIR, "execve called")
1053
1054 try:
1055 orig_execv = os.execv
1056 orig_execve = os.execve
1057 orig_defpath = os.defpath
1058 os.execv = mock_execv
1059 os.execve = mock_execve
1060 if defpath is not None:
1061 os.defpath = defpath
1062 yield calls
1063 finally:
1064 os.execv = orig_execv
1065 os.execve = orig_execve
1066 os.defpath = orig_defpath
1067
Guido van Rossume7ba4952007-06-06 23:52:48 +00001068class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001069 @unittest.skipIf(USING_LINUXTHREADS,
1070 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001071 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001072 self.assertRaises(OSError, os.execvpe, 'no such app-',
1073 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001074
Thomas Heller6790d602007-08-30 17:15:14 +00001075 def test_execvpe_with_bad_arglist(self):
1076 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1077
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001078 @unittest.skipUnless(hasattr(os, '_execvpe'),
1079 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001080 def _test_internal_execvpe(self, test_type):
1081 program_path = os.sep + 'absolutepath'
1082 if test_type is bytes:
1083 program = b'executable'
1084 fullpath = os.path.join(os.fsencode(program_path), program)
1085 native_fullpath = fullpath
1086 arguments = [b'progname', 'arg1', 'arg2']
1087 else:
1088 program = 'executable'
1089 arguments = ['progname', 'arg1', 'arg2']
1090 fullpath = os.path.join(program_path, program)
1091 if os.name != "nt":
1092 native_fullpath = os.fsencode(fullpath)
1093 else:
1094 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001095 env = {'spam': 'beans'}
1096
Victor Stinnerb745a742010-05-18 17:17:23 +00001097 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001098 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001099 self.assertRaises(RuntimeError,
1100 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001101 self.assertEqual(len(calls), 1)
1102 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1103
Victor Stinnerb745a742010-05-18 17:17:23 +00001104 # test os._execvpe() with a relative path:
1105 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001106 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001107 self.assertRaises(OSError,
1108 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001109 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001110 self.assertSequenceEqual(calls[0],
1111 ('execve', native_fullpath, (arguments, env)))
1112
1113 # test os._execvpe() with a relative path:
1114 # os.get_exec_path() reads the 'PATH' variable
1115 with _execvpe_mockup() as calls:
1116 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001117 if test_type is bytes:
1118 env_path[b'PATH'] = program_path
1119 else:
1120 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001121 self.assertRaises(OSError,
1122 os._execvpe, program, arguments, env=env_path)
1123 self.assertEqual(len(calls), 1)
1124 self.assertSequenceEqual(calls[0],
1125 ('execve', native_fullpath, (arguments, env_path)))
1126
1127 def test_internal_execvpe_str(self):
1128 self._test_internal_execvpe(str)
1129 if os.name != "nt":
1130 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001131
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001132
Serhiy Storchaka79080682013-11-03 21:31:18 +02001133@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001134class Win32ErrorTests(unittest.TestCase):
1135 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001136 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001137
1138 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001139 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001140
1141 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001142 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001143
1144 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001145 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001146 try:
1147 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
1148 finally:
1149 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001150 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001151
1152 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001153 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001154
Thomas Wouters477c8d52006-05-27 19:21:47 +00001155 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +00001156 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001157
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001158class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001159 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001160 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1161 #singles.append("close")
1162 #We omit close because it doesn'r raise an exception on some platforms
1163 def get_single(f):
1164 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001165 if hasattr(os, f):
1166 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001167 return helper
1168 for f in singles:
1169 locals()["test_"+f] = get_single(f)
1170
Benjamin Peterson7522c742009-01-19 21:00:09 +00001171 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001172 try:
1173 f(support.make_bad_fd(), *args)
1174 except OSError as e:
1175 self.assertEqual(e.errno, errno.EBADF)
1176 else:
1177 self.fail("%r didn't raise a OSError with a bad file descriptor"
1178 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001179
Serhiy Storchaka79080682013-11-03 21:31:18 +02001180 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001181 def test_isatty(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001182 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001183
Serhiy Storchaka79080682013-11-03 21:31:18 +02001184 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001185 def test_closerange(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001186 fd = support.make_bad_fd()
1187 # Make sure none of the descriptors we are about to close are
1188 # currently valid (issue 6542).
1189 for i in range(10):
1190 try: os.fstat(fd+i)
1191 except OSError:
1192 pass
1193 else:
1194 break
1195 if i < 2:
1196 raise unittest.SkipTest(
1197 "Unable to acquire a range of invalid file descriptors")
1198 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001199
Serhiy Storchaka79080682013-11-03 21:31:18 +02001200 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001201 def test_dup2(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001202 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001203
Serhiy Storchaka79080682013-11-03 21:31:18 +02001204 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001205 def test_fchmod(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001206 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001207
Serhiy Storchaka79080682013-11-03 21:31:18 +02001208 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001209 def test_fchown(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001210 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001211
Serhiy Storchaka79080682013-11-03 21:31:18 +02001212 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001213 def test_fpathconf(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001214 self.check(os.pathconf, "PC_NAME_MAX")
1215 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001216
Serhiy Storchaka79080682013-11-03 21:31:18 +02001217 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001218 def test_ftruncate(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001219 self.check(os.truncate, 0)
1220 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001221
Serhiy Storchaka79080682013-11-03 21:31:18 +02001222 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001223 def test_lseek(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001224 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001225
Serhiy Storchaka79080682013-11-03 21:31:18 +02001226 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001227 def test_read(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001228 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001229
Serhiy Storchaka79080682013-11-03 21:31:18 +02001230 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001231 def test_tcsetpgrpt(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001232 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001233
Serhiy Storchaka79080682013-11-03 21:31:18 +02001234 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001235 def test_write(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001236 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001237
Brian Curtin1b9df392010-11-24 20:24:31 +00001238
1239class LinkTests(unittest.TestCase):
1240 def setUp(self):
1241 self.file1 = support.TESTFN
1242 self.file2 = os.path.join(support.TESTFN + "2")
1243
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001244 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001245 for file in (self.file1, self.file2):
1246 if os.path.exists(file):
1247 os.unlink(file)
1248
Brian Curtin1b9df392010-11-24 20:24:31 +00001249 def _test_link(self, file1, file2):
1250 with open(file1, "w") as f1:
1251 f1.write("test")
1252
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001253 with warnings.catch_warnings():
1254 warnings.simplefilter("ignore", DeprecationWarning)
1255 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001256 with open(file1, "r") as f1, open(file2, "r") as f2:
1257 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1258
1259 def test_link(self):
1260 self._test_link(self.file1, self.file2)
1261
1262 def test_link_bytes(self):
1263 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1264 bytes(self.file2, sys.getfilesystemencoding()))
1265
Brian Curtinf498b752010-11-30 15:54:04 +00001266 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001267 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001268 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001269 except UnicodeError:
1270 raise unittest.SkipTest("Unable to encode for this platform.")
1271
Brian Curtinf498b752010-11-30 15:54:04 +00001272 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001273 self.file2 = self.file1 + "2"
1274 self._test_link(self.file1, self.file2)
1275
Serhiy Storchaka79080682013-11-03 21:31:18 +02001276@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1277class PosixUidGidTests(unittest.TestCase):
1278 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1279 def test_setuid(self):
1280 if os.getuid() != 0:
1281 self.assertRaises(os.error, os.setuid, 0)
1282 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001283
Serhiy Storchaka79080682013-11-03 21:31:18 +02001284 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1285 def test_setgid(self):
1286 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1287 self.assertRaises(os.error, os.setgid, 0)
1288 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001289
Serhiy Storchaka79080682013-11-03 21:31:18 +02001290 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1291 def test_seteuid(self):
1292 if os.getuid() != 0:
1293 self.assertRaises(os.error, os.seteuid, 0)
1294 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001295
Serhiy Storchaka79080682013-11-03 21:31:18 +02001296 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1297 def test_setegid(self):
1298 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1299 self.assertRaises(os.error, os.setegid, 0)
1300 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001301
Serhiy Storchaka79080682013-11-03 21:31:18 +02001302 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1303 def test_setreuid(self):
1304 if os.getuid() != 0:
1305 self.assertRaises(os.error, os.setreuid, 0, 0)
1306 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1307 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001308
Serhiy Storchaka79080682013-11-03 21:31:18 +02001309 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1310 def test_setregid(self):
1311 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1312 self.assertRaises(os.error, os.setregid, 0, 0)
1313 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1314 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001315
Serhiy Storchaka79080682013-11-03 21:31:18 +02001316@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1317class Pep383Tests(unittest.TestCase):
1318 def setUp(self):
1319 if support.TESTFN_UNENCODABLE:
1320 self.dir = support.TESTFN_UNENCODABLE
1321 elif support.TESTFN_NONASCII:
1322 self.dir = support.TESTFN_NONASCII
1323 else:
1324 self.dir = support.TESTFN
1325 self.bdir = os.fsencode(self.dir)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001326
Serhiy Storchaka79080682013-11-03 21:31:18 +02001327 bytesfn = []
1328 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001329 try:
Serhiy Storchaka79080682013-11-03 21:31:18 +02001330 fn = os.fsencode(fn)
1331 except UnicodeEncodeError:
1332 return
1333 bytesfn.append(fn)
1334 add_filename(support.TESTFN_UNICODE)
1335 if support.TESTFN_UNENCODABLE:
1336 add_filename(support.TESTFN_UNENCODABLE)
1337 if support.TESTFN_NONASCII:
1338 add_filename(support.TESTFN_NONASCII)
1339 if not bytesfn:
1340 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001341
Serhiy Storchaka79080682013-11-03 21:31:18 +02001342 self.unicodefn = set()
1343 os.mkdir(self.dir)
1344 try:
1345 for fn in bytesfn:
1346 support.create_empty_file(os.path.join(self.bdir, fn))
1347 fn = os.fsdecode(fn)
1348 if fn in self.unicodefn:
1349 raise ValueError("duplicate filename")
1350 self.unicodefn.add(fn)
1351 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001352 shutil.rmtree(self.dir)
Serhiy Storchaka79080682013-11-03 21:31:18 +02001353 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001354
Serhiy Storchaka79080682013-11-03 21:31:18 +02001355 def tearDown(self):
1356 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001357
Serhiy Storchaka79080682013-11-03 21:31:18 +02001358 def test_listdir(self):
1359 expected = self.unicodefn
1360 found = set(os.listdir(self.dir))
1361 self.assertEqual(found, expected)
1362 # test listdir without arguments
1363 current_directory = os.getcwd()
1364 try:
1365 os.chdir(os.sep)
1366 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1367 finally:
1368 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001369
Serhiy Storchaka79080682013-11-03 21:31:18 +02001370 def test_open(self):
1371 for fn in self.unicodefn:
1372 f = open(os.path.join(self.dir, fn), 'rb')
1373 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001374
Serhiy Storchaka79080682013-11-03 21:31:18 +02001375 @unittest.skipUnless(hasattr(os, 'statvfs'),
1376 "need os.statvfs()")
1377 def test_statvfs(self):
1378 # issue #9645
1379 for fn in self.unicodefn:
1380 # should not fail with file not found error
1381 fullname = os.path.join(self.dir, fn)
1382 os.statvfs(fullname)
1383
1384 def test_stat(self):
1385 for fn in self.unicodefn:
1386 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001387
Brian Curtineb24d742010-04-12 17:16:38 +00001388@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1389class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001390 def _kill(self, sig):
1391 # Start sys.executable as a subprocess and communicate from the
1392 # subprocess to the parent that the interpreter is ready. When it
1393 # becomes ready, send *sig* via os.kill to the subprocess and check
1394 # that the return code is equal to *sig*.
1395 import ctypes
1396 from ctypes import wintypes
1397 import msvcrt
1398
1399 # Since we can't access the contents of the process' stdout until the
1400 # process has exited, use PeekNamedPipe to see what's inside stdout
1401 # without waiting. This is done so we can tell that the interpreter
1402 # is started and running at a point where it could handle a signal.
1403 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1404 PeekNamedPipe.restype = wintypes.BOOL
1405 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1406 ctypes.POINTER(ctypes.c_char), # stdout buf
1407 wintypes.DWORD, # Buffer size
1408 ctypes.POINTER(wintypes.DWORD), # bytes read
1409 ctypes.POINTER(wintypes.DWORD), # bytes avail
1410 ctypes.POINTER(wintypes.DWORD)) # bytes left
1411 msg = "running"
1412 proc = subprocess.Popen([sys.executable, "-c",
1413 "import sys;"
1414 "sys.stdout.write('{}');"
1415 "sys.stdout.flush();"
1416 "input()".format(msg)],
1417 stdout=subprocess.PIPE,
1418 stderr=subprocess.PIPE,
1419 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001420 self.addCleanup(proc.stdout.close)
1421 self.addCleanup(proc.stderr.close)
1422 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001423
1424 count, max = 0, 100
1425 while count < max and proc.poll() is None:
1426 # Create a string buffer to store the result of stdout from the pipe
1427 buf = ctypes.create_string_buffer(len(msg))
1428 # Obtain the text currently in proc.stdout
1429 # Bytes read/avail/left are left as NULL and unused
1430 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1431 buf, ctypes.sizeof(buf), None, None, None)
1432 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1433 if buf.value:
1434 self.assertEqual(msg, buf.value.decode())
1435 break
1436 time.sleep(0.1)
1437 count += 1
1438 else:
1439 self.fail("Did not receive communication from the subprocess")
1440
Brian Curtineb24d742010-04-12 17:16:38 +00001441 os.kill(proc.pid, sig)
1442 self.assertEqual(proc.wait(), sig)
1443
1444 def test_kill_sigterm(self):
1445 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001446 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001447
1448 def test_kill_int(self):
1449 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001450 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001451
1452 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001453 tagname = "test_os_%s" % uuid.uuid1()
1454 m = mmap.mmap(-1, 1, tagname)
1455 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001456 # Run a script which has console control handling enabled.
1457 proc = subprocess.Popen([sys.executable,
1458 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001459 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001460 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1461 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001462 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001463 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001464 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001465 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001466 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001467 count += 1
1468 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001469 # Forcefully kill the process if we weren't able to signal it.
1470 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001471 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001472 os.kill(proc.pid, event)
1473 # proc.send_signal(event) could also be done here.
1474 # Allow time for the signal to be passed and the process to exit.
1475 time.sleep(0.5)
1476 if not proc.poll():
1477 # Forcefully kill the process if we weren't able to signal it.
1478 os.kill(proc.pid, signal.SIGINT)
1479 self.fail("subprocess did not stop on {}".format(name))
1480
1481 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1482 def test_CTRL_C_EVENT(self):
1483 from ctypes import wintypes
1484 import ctypes
1485
1486 # Make a NULL value by creating a pointer with no argument.
1487 NULL = ctypes.POINTER(ctypes.c_int)()
1488 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1489 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1490 wintypes.BOOL)
1491 SetConsoleCtrlHandler.restype = wintypes.BOOL
1492
1493 # Calling this with NULL and FALSE causes the calling process to
1494 # handle CTRL+C, rather than ignore it. This property is inherited
1495 # by subprocesses.
1496 SetConsoleCtrlHandler(NULL, 0)
1497
1498 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1499
1500 def test_CTRL_BREAK_EVENT(self):
1501 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1502
1503
Brian Curtind40e6f72010-07-08 21:39:08 +00001504@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001505@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001506class Win32SymlinkTests(unittest.TestCase):
1507 filelink = 'filelinktest'
1508 filelink_target = os.path.abspath(__file__)
1509 dirlink = 'dirlinktest'
1510 dirlink_target = os.path.dirname(filelink_target)
1511 missing_link = 'missing link'
1512
1513 def setUp(self):
1514 assert os.path.exists(self.dirlink_target)
1515 assert os.path.exists(self.filelink_target)
1516 assert not os.path.exists(self.dirlink)
1517 assert not os.path.exists(self.filelink)
1518 assert not os.path.exists(self.missing_link)
1519
1520 def tearDown(self):
1521 if os.path.exists(self.filelink):
1522 os.remove(self.filelink)
1523 if os.path.exists(self.dirlink):
1524 os.rmdir(self.dirlink)
1525 if os.path.lexists(self.missing_link):
1526 os.remove(self.missing_link)
1527
1528 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001529 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001530 self.assertTrue(os.path.exists(self.dirlink))
1531 self.assertTrue(os.path.isdir(self.dirlink))
1532 self.assertTrue(os.path.islink(self.dirlink))
1533 self.check_stat(self.dirlink, self.dirlink_target)
1534
1535 def test_file_link(self):
1536 os.symlink(self.filelink_target, self.filelink)
1537 self.assertTrue(os.path.exists(self.filelink))
1538 self.assertTrue(os.path.isfile(self.filelink))
1539 self.assertTrue(os.path.islink(self.filelink))
1540 self.check_stat(self.filelink, self.filelink_target)
1541
1542 def _create_missing_dir_link(self):
1543 'Create a "directory" link to a non-existent target'
1544 linkname = self.missing_link
1545 if os.path.lexists(linkname):
1546 os.remove(linkname)
1547 target = r'c:\\target does not exist.29r3c740'
1548 assert not os.path.exists(target)
1549 target_is_dir = True
1550 os.symlink(target, linkname, target_is_dir)
1551
1552 def test_remove_directory_link_to_missing_target(self):
1553 self._create_missing_dir_link()
1554 # For compatibility with Unix, os.remove will check the
1555 # directory status and call RemoveDirectory if the symlink
1556 # was created with target_is_dir==True.
1557 os.remove(self.missing_link)
1558
1559 @unittest.skip("currently fails; consider for improvement")
1560 def test_isdir_on_directory_link_to_missing_target(self):
1561 self._create_missing_dir_link()
1562 # consider having isdir return true for directory links
1563 self.assertTrue(os.path.isdir(self.missing_link))
1564
1565 @unittest.skip("currently fails; consider for improvement")
1566 def test_rmdir_on_directory_link_to_missing_target(self):
1567 self._create_missing_dir_link()
1568 # consider allowing rmdir to remove directory links
1569 os.rmdir(self.missing_link)
1570
1571 def check_stat(self, link, target):
1572 self.assertEqual(os.stat(link), os.stat(target))
1573 self.assertNotEqual(os.lstat(link), os.stat(link))
1574
Brian Curtind25aef52011-06-13 15:16:04 -05001575 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001576 with warnings.catch_warnings():
1577 warnings.simplefilter("ignore", DeprecationWarning)
1578 self.assertEqual(os.stat(bytes_link), os.stat(target))
1579 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001580
1581 def test_12084(self):
1582 level1 = os.path.abspath(support.TESTFN)
1583 level2 = os.path.join(level1, "level2")
1584 level3 = os.path.join(level2, "level3")
1585 try:
1586 os.mkdir(level1)
1587 os.mkdir(level2)
1588 os.mkdir(level3)
1589
1590 file1 = os.path.abspath(os.path.join(level1, "file1"))
1591
1592 with open(file1, "w") as f:
1593 f.write("file1")
1594
1595 orig_dir = os.getcwd()
1596 try:
1597 os.chdir(level2)
1598 link = os.path.join(level2, "link")
1599 os.symlink(os.path.relpath(file1), "link")
1600 self.assertIn("link", os.listdir(os.getcwd()))
1601
1602 # Check os.stat calls from the same dir as the link
1603 self.assertEqual(os.stat(file1), os.stat("link"))
1604
1605 # Check os.stat calls from a dir below the link
1606 os.chdir(level1)
1607 self.assertEqual(os.stat(file1),
1608 os.stat(os.path.relpath(link)))
1609
1610 # Check os.stat calls from a dir above the link
1611 os.chdir(level3)
1612 self.assertEqual(os.stat(file1),
1613 os.stat(os.path.relpath(link)))
1614 finally:
1615 os.chdir(orig_dir)
1616 except OSError as err:
1617 self.fail(err)
1618 finally:
1619 os.remove(file1)
1620 shutil.rmtree(level1)
1621
Brian Curtind40e6f72010-07-08 21:39:08 +00001622
Jason R. Coombs3a092862013-05-27 23:21:28 -04001623@support.skip_unless_symlink
1624class NonLocalSymlinkTests(unittest.TestCase):
1625
1626 def setUp(self):
1627 """
1628 Create this structure:
1629
1630 base
1631 \___ some_dir
1632 """
1633 os.makedirs('base/some_dir')
1634
1635 def tearDown(self):
1636 shutil.rmtree('base')
1637
1638 def test_directory_link_nonlocal(self):
1639 """
1640 The symlink target should resolve relative to the link, not relative
1641 to the current directory.
1642
1643 Then, link base/some_link -> base/some_dir and ensure that some_link
1644 is resolved as a directory.
1645
1646 In issue13772, it was discovered that directory detection failed if
1647 the symlink target was not specified relative to the current
1648 directory, which was a defect in the implementation.
1649 """
1650 src = os.path.join('base', 'some_link')
1651 os.symlink('some_dir', src)
1652 assert os.path.isdir(src)
1653
1654
Victor Stinnere8d51452010-08-19 01:05:19 +00001655class FSEncodingTests(unittest.TestCase):
1656 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001657 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1658 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001659
Victor Stinnere8d51452010-08-19 01:05:19 +00001660 def test_identity(self):
1661 # assert fsdecode(fsencode(x)) == x
1662 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1663 try:
1664 bytesfn = os.fsencode(fn)
1665 except UnicodeEncodeError:
1666 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001667 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001668
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001669
Brett Cannonefb00c02012-02-29 18:31:31 -05001670
1671class DeviceEncodingTests(unittest.TestCase):
1672
1673 def test_bad_fd(self):
1674 # Return None when an fd doesn't actually exist.
1675 self.assertIsNone(os.device_encoding(123456))
1676
Philip Jenveye308b7c2012-02-29 16:16:15 -08001677 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1678 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001679 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001680 def test_device_encoding(self):
1681 encoding = os.device_encoding(0)
1682 self.assertIsNotNone(encoding)
1683 self.assertTrue(codecs.lookup(encoding))
1684
1685
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001686class PidTests(unittest.TestCase):
1687 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1688 def test_getppid(self):
1689 p = subprocess.Popen([sys.executable, '-c',
1690 'import os; print(os.getppid())'],
1691 stdout=subprocess.PIPE)
1692 stdout, _ = p.communicate()
1693 # We are the parent of our subprocess
1694 self.assertEqual(int(stdout), os.getpid())
1695
1696
Brian Curtin0151b8e2010-09-24 13:43:43 +00001697# The introduction of this TestCase caused at least two different errors on
1698# *nix buildbots. Temporarily skip this to let the buildbots move along.
1699@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001700@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1701class LoginTests(unittest.TestCase):
1702 def test_getlogin(self):
1703 user_name = os.getlogin()
1704 self.assertNotEqual(len(user_name), 0)
1705
1706
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001707@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1708 "needs os.getpriority and os.setpriority")
1709class ProgramPriorityTests(unittest.TestCase):
1710 """Tests for os.getpriority() and os.setpriority()."""
1711
1712 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001713
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001714 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1715 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1716 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001717 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1718 if base >= 19 and new_prio <= 19:
1719 raise unittest.SkipTest(
1720 "unable to reliably test setpriority at current nice level of %s" % base)
1721 else:
1722 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001723 finally:
1724 try:
1725 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1726 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001727 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001728 raise
1729
1730
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001731if threading is not None:
1732 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001733
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001734 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001735
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001736 def __init__(self, conn):
1737 asynchat.async_chat.__init__(self, conn)
1738 self.in_buffer = []
1739 self.closed = False
1740 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001741
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001742 def handle_read(self):
1743 data = self.recv(4096)
1744 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001745
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001746 def get_data(self):
1747 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001748
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001749 def handle_close(self):
1750 self.close()
1751 self.closed = True
1752
1753 def handle_error(self):
1754 raise
1755
1756 def __init__(self, address):
1757 threading.Thread.__init__(self)
1758 asyncore.dispatcher.__init__(self)
1759 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1760 self.bind(address)
1761 self.listen(5)
1762 self.host, self.port = self.socket.getsockname()[:2]
1763 self.handler_instance = None
1764 self._active = False
1765 self._active_lock = threading.Lock()
1766
1767 # --- public API
1768
1769 @property
1770 def running(self):
1771 return self._active
1772
1773 def start(self):
1774 assert not self.running
1775 self.__flag = threading.Event()
1776 threading.Thread.start(self)
1777 self.__flag.wait()
1778
1779 def stop(self):
1780 assert self.running
1781 self._active = False
1782 self.join()
1783
1784 def wait(self):
1785 # wait for handler connection to be closed, then stop the server
1786 while not getattr(self.handler_instance, "closed", False):
1787 time.sleep(0.001)
1788 self.stop()
1789
1790 # --- internals
1791
1792 def run(self):
1793 self._active = True
1794 self.__flag.set()
1795 while self._active and asyncore.socket_map:
1796 self._active_lock.acquire()
1797 asyncore.loop(timeout=0.001, count=1)
1798 self._active_lock.release()
1799 asyncore.close_all()
1800
1801 def handle_accept(self):
1802 conn, addr = self.accept()
1803 self.handler_instance = self.Handler(conn)
1804
1805 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001806 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001807 handle_read = handle_connect
1808
1809 def writable(self):
1810 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001811
1812 def handle_error(self):
1813 raise
1814
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001815
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001816@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001817@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1818class TestSendfile(unittest.TestCase):
1819
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001820 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001821 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001822 not sys.platform.startswith("solaris") and \
1823 not sys.platform.startswith("sunos")
Serhiy Storchaka79080682013-11-03 21:31:18 +02001824 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
1825 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001826
1827 @classmethod
1828 def setUpClass(cls):
1829 with open(support.TESTFN, "wb") as f:
1830 f.write(cls.DATA)
1831
1832 @classmethod
1833 def tearDownClass(cls):
1834 support.unlink(support.TESTFN)
1835
1836 def setUp(self):
1837 self.server = SendfileTestServer((support.HOST, 0))
1838 self.server.start()
1839 self.client = socket.socket()
1840 self.client.connect((self.server.host, self.server.port))
1841 self.client.settimeout(1)
1842 # synchronize by waiting for "220 ready" response
1843 self.client.recv(1024)
1844 self.sockno = self.client.fileno()
1845 self.file = open(support.TESTFN, 'rb')
1846 self.fileno = self.file.fileno()
1847
1848 def tearDown(self):
1849 self.file.close()
1850 self.client.close()
1851 if self.server.running:
1852 self.server.stop()
1853
1854 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1855 """A higher level wrapper representing how an application is
1856 supposed to use sendfile().
1857 """
1858 while 1:
1859 try:
1860 if self.SUPPORT_HEADERS_TRAILERS:
1861 return os.sendfile(sock, file, offset, nbytes, headers,
1862 trailers)
1863 else:
1864 return os.sendfile(sock, file, offset, nbytes)
1865 except OSError as err:
1866 if err.errno == errno.ECONNRESET:
1867 # disconnected
1868 raise
1869 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1870 # we have to retry send data
1871 continue
1872 else:
1873 raise
1874
1875 def test_send_whole_file(self):
1876 # normal send
1877 total_sent = 0
1878 offset = 0
1879 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001880 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001881 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1882 if sent == 0:
1883 break
1884 offset += sent
1885 total_sent += sent
1886 self.assertTrue(sent <= nbytes)
1887 self.assertEqual(offset, total_sent)
1888
1889 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001890 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001891 self.client.close()
1892 self.server.wait()
1893 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001894 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001895 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001896
1897 def test_send_at_certain_offset(self):
1898 # start sending a file at a certain offset
1899 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001900 offset = len(self.DATA) // 2
1901 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001902 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001903 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001904 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1905 if sent == 0:
1906 break
1907 offset += sent
1908 total_sent += sent
1909 self.assertTrue(sent <= nbytes)
1910
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001911 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001912 self.client.close()
1913 self.server.wait()
1914 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001915 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001916 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001917 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001918 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001919
1920 def test_offset_overflow(self):
1921 # specify an offset > file size
1922 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001923 try:
1924 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1925 except OSError as e:
1926 # Solaris can raise EINVAL if offset >= file length, ignore.
1927 if e.errno != errno.EINVAL:
1928 raise
1929 else:
1930 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001931 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001932 self.client.close()
1933 self.server.wait()
1934 data = self.server.handler_instance.get_data()
1935 self.assertEqual(data, b'')
1936
1937 def test_invalid_offset(self):
1938 with self.assertRaises(OSError) as cm:
1939 os.sendfile(self.sockno, self.fileno, -1, 4096)
1940 self.assertEqual(cm.exception.errno, errno.EINVAL)
1941
1942 # --- headers / trailers tests
1943
Serhiy Storchaka79080682013-11-03 21:31:18 +02001944 @requires_headers_trailers
1945 def test_headers(self):
1946 total_sent = 0
1947 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1948 headers=[b"x" * 512])
1949 total_sent += sent
1950 offset = 4096
1951 nbytes = 4096
1952 while 1:
1953 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1954 offset, nbytes)
1955 if sent == 0:
1956 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001957 total_sent += sent
Serhiy Storchaka79080682013-11-03 21:31:18 +02001958 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001959
Serhiy Storchaka79080682013-11-03 21:31:18 +02001960 expected_data = b"x" * 512 + self.DATA
1961 self.assertEqual(total_sent, len(expected_data))
1962 self.client.close()
1963 self.server.wait()
1964 data = self.server.handler_instance.get_data()
1965 self.assertEqual(hash(data), hash(expected_data))
1966
1967 @requires_headers_trailers
1968 def test_trailers(self):
1969 TESTFN2 = support.TESTFN + "2"
1970 file_data = b"abcdef"
1971 with open(TESTFN2, 'wb') as f:
1972 f.write(file_data)
1973 with open(TESTFN2, 'rb')as f:
1974 self.addCleanup(os.remove, TESTFN2)
1975 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
1976 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001977 self.client.close()
1978 self.server.wait()
1979 data = self.server.handler_instance.get_data()
Serhiy Storchaka79080682013-11-03 21:31:18 +02001980 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001981
Serhiy Storchaka79080682013-11-03 21:31:18 +02001982 @requires_headers_trailers
1983 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
1984 'test needs os.SF_NODISKIO')
1985 def test_flags(self):
1986 try:
1987 os.sendfile(self.sockno, self.fileno, 0, 4096,
1988 flags=os.SF_NODISKIO)
1989 except OSError as err:
1990 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1991 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001992
1993
Larry Hastings9cf065c2012-06-22 16:30:09 -07001994def supports_extended_attributes():
1995 if not hasattr(os, "setxattr"):
1996 return False
1997 try:
1998 with open(support.TESTFN, "wb") as fp:
1999 try:
2000 os.setxattr(fp.fileno(), b"user.test", b"")
2001 except OSError:
2002 return False
2003 finally:
2004 support.unlink(support.TESTFN)
2005 # Kernels < 2.6.39 don't respect setxattr flags.
2006 kernel_version = platform.release()
2007 m = re.match("2.6.(\d{1,2})", kernel_version)
2008 return m is None or int(m.group(1)) >= 39
2009
2010
2011@unittest.skipUnless(supports_extended_attributes(),
2012 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002013class ExtendedAttributeTests(unittest.TestCase):
2014
2015 def tearDown(self):
2016 support.unlink(support.TESTFN)
2017
Larry Hastings9cf065c2012-06-22 16:30:09 -07002018 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002019 fn = support.TESTFN
2020 open(fn, "wb").close()
2021 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002022 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002023 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002024 init_xattr = listxattr(fn)
2025 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002026 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002027 xattr = set(init_xattr)
2028 xattr.add("user.test")
2029 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002030 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2031 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2032 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002033 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002034 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002035 self.assertEqual(cm.exception.errno, errno.EEXIST)
2036 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002037 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002038 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002039 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002040 xattr.add("user.test2")
2041 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002042 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002043 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002044 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002045 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002046 xattr.remove("user.test")
2047 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002048 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2049 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2050 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2051 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002052 many = sorted("user.test{}".format(i) for i in range(100))
2053 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002054 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002055 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002056
Larry Hastings9cf065c2012-06-22 16:30:09 -07002057 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002058 def make_bytes(s):
2059 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002060 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002061 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002062 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002063
2064 def test_simple(self):
2065 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2066 os.listxattr)
2067
2068 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002069 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2070 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002071
2072 def test_fds(self):
2073 def getxattr(path, *args):
2074 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002075 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002076 def setxattr(path, *args):
2077 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002078 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002079 def removexattr(path, *args):
2080 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002081 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002082 def listxattr(path, *args):
2083 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002084 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002085 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2086
2087
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002088@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2089class Win32DeprecatedBytesAPI(unittest.TestCase):
2090 def test_deprecated(self):
2091 import nt
2092 filename = os.fsencode(support.TESTFN)
2093 with warnings.catch_warnings():
2094 warnings.simplefilter("error", DeprecationWarning)
2095 for func, *args in (
2096 (nt._getfullpathname, filename),
2097 (nt._isdir, filename),
2098 (os.access, filename, os.R_OK),
2099 (os.chdir, filename),
2100 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002101 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002102 (os.link, filename, filename),
2103 (os.listdir, filename),
2104 (os.lstat, filename),
2105 (os.mkdir, filename),
2106 (os.open, filename, os.O_RDONLY),
2107 (os.rename, filename, filename),
2108 (os.rmdir, filename),
2109 (os.startfile, filename),
2110 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002111 (os.unlink, filename),
2112 (os.utime, filename),
2113 ):
2114 self.assertRaises(DeprecationWarning, func, *args)
2115
Victor Stinner28216442011-11-16 00:34:44 +01002116 @support.skip_unless_symlink
2117 def test_symlink(self):
2118 filename = os.fsencode(support.TESTFN)
2119 with warnings.catch_warnings():
2120 warnings.simplefilter("error", DeprecationWarning)
2121 self.assertRaises(DeprecationWarning,
2122 os.symlink, filename, filename)
2123
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002124
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002125@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2126class TermsizeTests(unittest.TestCase):
2127 def test_does_not_crash(self):
2128 """Check if get_terminal_size() returns a meaningful value.
2129
2130 There's no easy portable way to actually check the size of the
2131 terminal, so let's check if it returns something sensible instead.
2132 """
2133 try:
2134 size = os.get_terminal_size()
2135 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002136 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002137 # Under win32 a generic OSError can be thrown if the
2138 # handle cannot be retrieved
2139 self.skipTest("failed to query terminal size")
2140 raise
2141
Antoine Pitroucfade362012-02-08 23:48:59 +01002142 self.assertGreaterEqual(size.columns, 0)
2143 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002144
2145 def test_stty_match(self):
2146 """Check if stty returns the same results
2147
2148 stty actually tests stdin, so get_terminal_size is invoked on
2149 stdin explicitly. If stty succeeded, then get_terminal_size()
2150 should work too.
2151 """
2152 try:
2153 size = subprocess.check_output(['stty', 'size']).decode().split()
2154 except (FileNotFoundError, subprocess.CalledProcessError):
2155 self.skipTest("stty invocation failed")
2156 expected = (int(size[1]), int(size[0])) # reversed order
2157
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002158 try:
2159 actual = os.get_terminal_size(sys.__stdin__.fileno())
2160 except OSError as e:
2161 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2162 # Under win32 a generic OSError can be thrown if the
2163 # handle cannot be retrieved
2164 self.skipTest("failed to query terminal size")
2165 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002166 self.assertEqual(expected, actual)
2167
2168
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002169@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002170def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002171 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002172 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002173 StatAttributeTests,
2174 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002175 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002176 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002177 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002178 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002179 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002180 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002181 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002182 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002183 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002184 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002185 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002186 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002187 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002188 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002189 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002190 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002191 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002192 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002193 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002194 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002195 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002196 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002197 TermsizeTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002198 RemoveDirsTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002199 )
Fred Drake2e2be372001-09-20 21:33:42 +00002200
2201if __name__ == "__main__":
2202 test_main()