blob: d70a0aeaf84a92d17cb4869d21cfa2cd399ad69b [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000027try:
28 import threading
29except ImportError:
30 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020031try:
32 import resource
33except ImportError:
34 resource = None
35
Georg Brandl2daf6ae2012-02-20 19:54:16 +010036from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000037
Victor Stinner034d0aa2012-06-05 01:22:15 +020038with warnings.catch_warnings():
39 warnings.simplefilter("ignore", DeprecationWarning)
40 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010041st = os.stat(__file__)
42stat_supports_subsecond = (
43 # check if float and int timestamps are different
44 (st.st_atime != st[7])
45 or (st.st_mtime != st[8])
46 or (st.st_ctime != st[9]))
47
Mark Dickinson7cf03892010-04-16 13:45:35 +000048# Detect whether we're on a Linux system that uses the (now outdated
49# and unmaintained) linuxthreads threading library. There's an issue
50# when combining linuxthreads with a failed execv call: see
51# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020052if hasattr(sys, 'thread_info') and sys.thread_info.version:
53 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
54else:
55 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000056
Stefan Krahebee49a2013-01-17 15:31:00 +010057# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
58HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
59
Thomas Wouters0e3f5912006-08-11 14:57:12 +000060# Tests creating TESTFN
61class FileTests(unittest.TestCase):
62 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000063 if os.path.exists(support.TESTFN):
64 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000065 tearDown = setUp
66
67 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000068 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000070 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000071
Christian Heimesfdab48e2008-01-20 09:06:41 +000072 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000073 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
74 # We must allocate two consecutive file descriptors, otherwise
75 # it will mess up other file descriptors (perhaps even the three
76 # standard ones).
77 second = os.dup(first)
78 try:
79 retries = 0
80 while second != first + 1:
81 os.close(first)
82 retries += 1
83 if retries > 10:
84 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000085 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000086 first, second = second, os.dup(second)
87 finally:
88 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000089 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000090 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000091 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000092
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000093 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000094 def test_rename(self):
95 path = support.TESTFN
96 old = sys.getrefcount(path)
97 self.assertRaises(TypeError, os.rename, path, 0)
98 new = sys.getrefcount(path)
99 self.assertEqual(old, new)
100
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000101 def test_read(self):
102 with open(support.TESTFN, "w+b") as fobj:
103 fobj.write(b"spam")
104 fobj.flush()
105 fd = fobj.fileno()
106 os.lseek(fd, 0, 0)
107 s = os.read(fd, 4)
108 self.assertEqual(type(s), bytes)
109 self.assertEqual(s, b"spam")
110
111 def test_write(self):
112 # os.write() accepts bytes- and buffer-like objects but not strings
113 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
114 self.assertRaises(TypeError, os.write, fd, "beans")
115 os.write(fd, b"bacon\n")
116 os.write(fd, bytearray(b"eggs\n"))
117 os.write(fd, memoryview(b"spam\n"))
118 os.close(fd)
119 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000120 self.assertEqual(fobj.read().splitlines(),
121 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000122
Victor Stinnere0daff12011-03-20 23:36:35 +0100123 def write_windows_console(self, *args):
124 retcode = subprocess.call(args,
125 # use a new console to not flood the test output
126 creationflags=subprocess.CREATE_NEW_CONSOLE,
127 # use a shell to hide the console window (SW_HIDE)
128 shell=True)
129 self.assertEqual(retcode, 0)
130
131 @unittest.skipUnless(sys.platform == 'win32',
132 'test specific to the Windows console')
133 def test_write_windows_console(self):
134 # Issue #11395: the Windows console returns an error (12: not enough
135 # space error) on writing into stdout if stdout mode is binary and the
136 # length is greater than 66,000 bytes (or less, depending on heap
137 # usage).
138 code = "print('x' * 100000)"
139 self.write_windows_console(sys.executable, "-c", code)
140 self.write_windows_console(sys.executable, "-u", "-c", code)
141
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000142 def fdopen_helper(self, *args):
143 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200144 f = os.fdopen(fd, *args)
145 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000146
147 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200148 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
149 os.close(fd)
150
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000151 self.fdopen_helper()
152 self.fdopen_helper('r')
153 self.fdopen_helper('r', 100)
154
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100155 def test_replace(self):
156 TESTFN2 = support.TESTFN + ".2"
157 with open(support.TESTFN, 'w') as f:
158 f.write("1")
159 with open(TESTFN2, 'w') as f:
160 f.write("2")
161 self.addCleanup(os.unlink, TESTFN2)
162 os.replace(support.TESTFN, TESTFN2)
163 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
164 with open(TESTFN2, 'r') as f:
165 self.assertEqual(f.read(), "1")
166
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200167
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000168# Test attributes on return values from os.*stat* family.
169class StatAttributeTests(unittest.TestCase):
170 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000171 os.mkdir(support.TESTFN)
172 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000173 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000174 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000175 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000176
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000177 def tearDown(self):
178 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000179 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180
Serhiy Storchaka79080682013-11-03 21:31:18 +0200181 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000182 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000183 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000184
185 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000186 self.assertEqual(result[stat.ST_SIZE], 3)
187 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000188
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000189 # Make sure all the attributes are there
190 members = dir(result)
191 for name in dir(stat):
192 if name[:3] == 'ST_':
193 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000194 if name.endswith("TIME"):
195 def trunc(x): return int(x)
196 else:
197 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000198 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000199 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000200 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000201
Larry Hastings6fe20b32012-04-19 15:07:49 -0700202 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700203 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700204 for name in 'st_atime st_mtime st_ctime'.split():
205 floaty = int(getattr(result, name) * 100000)
206 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700207 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700208
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000209 try:
210 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200211 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000212 except IndexError:
213 pass
214
215 # Make sure that assignment fails
216 try:
217 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200218 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000219 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 pass
221
222 try:
223 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200224 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000225 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000226 pass
227
228 try:
229 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200230 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000231 except AttributeError:
232 pass
233
234 # Use the stat_result constructor with a too-short tuple.
235 try:
236 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200237 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000238 except TypeError:
239 pass
240
Ezio Melotti42da6632011-03-15 05:18:48 +0200241 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000242 try:
243 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
244 except TypeError:
245 pass
246
Antoine Pitrou38425292010-09-21 18:19:07 +0000247 def test_stat_attributes(self):
248 self.check_stat_attributes(self.fname)
249
250 def test_stat_attributes_bytes(self):
251 try:
252 fname = self.fname.encode(sys.getfilesystemencoding())
253 except UnicodeEncodeError:
254 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100255 with warnings.catch_warnings():
256 warnings.simplefilter("ignore", DeprecationWarning)
257 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000258
Serhiy Storchaka79080682013-11-03 21:31:18 +0200259 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000260 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000261 try:
262 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000263 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000264 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000265 if e.errno == errno.ENOSYS:
Zachary Ware9fe6d862013-12-08 00:20:35 -0600266 self.skipTest('glibc always returns ENOSYS on AtheOS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267
268 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000271 # Make sure all the attributes are there.
272 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
273 'ffree', 'favail', 'flag', 'namemax')
274 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000275 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276
277 # Make sure that assignment really fails
278 try:
279 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200280 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000281 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282 pass
283
284 try:
285 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200286 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000287 except AttributeError:
288 pass
289
290 # Use the constructor with a too-short tuple.
291 try:
292 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200293 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000294 except TypeError:
295 pass
296
Ezio Melotti42da6632011-03-15 05:18:48 +0200297 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000298 try:
299 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
300 except TypeError:
301 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000302
Thomas Wouters89f507f2006-12-13 04:49:30 +0000303 def test_utime_dir(self):
304 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000305 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000306 # round to int, because some systems may support sub-second
307 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000308 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
309 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000310 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000311
Larry Hastings76ad59b2012-05-03 00:30:07 -0700312 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600313 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600314 # second argument. Check that the previous methods of passing
315 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700316 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600317 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700318 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
319 # Setting the time to the time you just read, then reading again,
320 # should always return exactly the same times.
321 st1 = os.stat(filename)
322 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
323 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600324 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700325 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600326 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700327 # Set to the current time in the new way
328 os.utime(filename)
329 st3 = os.stat(filename)
330 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
331
332 def test_utime(self):
333 def utime(file, times):
334 return os.utime(file, times)
335 self._test_utime(self.fname, getattr, utime, 10)
336 self._test_utime(support.TESTFN, getattr, utime, 10)
337
338
339 def _test_utime_ns(self, set_times_ns, test_dir=True):
340 def getattr_ns(o, attr):
341 return getattr(o, attr + "_ns")
342 ten_s = 10 * 1000 * 1000 * 1000
343 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
344 if test_dir:
345 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
346
347 def test_utime_ns(self):
348 def utime_ns(file, times):
349 return os.utime(file, ns=times)
350 self._test_utime_ns(utime_ns)
351
Larry Hastings9cf065c2012-06-22 16:30:09 -0700352 requires_utime_dir_fd = unittest.skipUnless(
353 os.utime in os.supports_dir_fd,
354 "dir_fd support for utime required for this test.")
355 requires_utime_fd = unittest.skipUnless(
356 os.utime in os.supports_fd,
357 "fd support for utime required for this test.")
358 requires_utime_nofollow_symlinks = unittest.skipUnless(
359 os.utime in os.supports_follow_symlinks,
360 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700361
Larry Hastings9cf065c2012-06-22 16:30:09 -0700362 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700363 def test_lutimes_ns(self):
364 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700365 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700366 self._test_utime_ns(lutimes_ns)
367
Larry Hastings9cf065c2012-06-22 16:30:09 -0700368 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700369 def test_futimes_ns(self):
370 def futimes_ns(file, times):
371 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700372 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700373 self._test_utime_ns(futimes_ns, test_dir=False)
374
375 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700376 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700377 getattr(os, name)(arg, (5, 5), ns=(5, 5))
378
379 def test_utime_invalid_arguments(self):
380 self._utime_invalid_arguments('utime', self.fname)
381
Brian Curtin52fbea12011-11-06 13:41:17 -0600382
Victor Stinner1aa54a42012-02-08 04:09:37 +0100383 @unittest.skipUnless(stat_supports_subsecond,
384 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100385 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100386 asec, amsec = 1, 901
387 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100388 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100389 mtime = msec + mmsec * 1e-3
390 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100391 os.utime(filename, (0, 0))
392 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200393 with warnings.catch_warnings():
394 warnings.simplefilter("ignore", DeprecationWarning)
395 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100396 st = os.stat(filename)
397 self.assertAlmostEqual(st.st_atime, atime, places=3)
398 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100399
Victor Stinnera2f7c002012-02-08 03:36:25 +0100400 def test_utime_subsecond(self):
401 def set_time(filename, atime, mtime):
402 os.utime(filename, (atime, mtime))
403 self._test_utime_subsecond(set_time)
404
Larry Hastings9cf065c2012-06-22 16:30:09 -0700405 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100406 def test_futimes_subsecond(self):
407 def set_time(filename, atime, mtime):
408 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700409 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100410 self._test_utime_subsecond(set_time)
411
Larry Hastings9cf065c2012-06-22 16:30:09 -0700412 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100413 def test_futimens_subsecond(self):
414 def set_time(filename, atime, mtime):
415 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700416 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100417 self._test_utime_subsecond(set_time)
418
Larry Hastings9cf065c2012-06-22 16:30:09 -0700419 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100420 def test_futimesat_subsecond(self):
421 def set_time(filename, atime, mtime):
422 dirname = os.path.dirname(filename)
423 dirfd = os.open(dirname, os.O_RDONLY)
424 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700425 os.utime(os.path.basename(filename), dir_fd=dirfd,
426 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100427 finally:
428 os.close(dirfd)
429 self._test_utime_subsecond(set_time)
430
Larry Hastings9cf065c2012-06-22 16:30:09 -0700431 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100432 def test_lutimes_subsecond(self):
433 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700434 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100435 self._test_utime_subsecond(set_time)
436
Larry Hastings9cf065c2012-06-22 16:30:09 -0700437 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100438 def test_utimensat_subsecond(self):
439 def set_time(filename, atime, mtime):
440 dirname = os.path.dirname(filename)
441 dirfd = os.open(dirname, os.O_RDONLY)
442 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700443 os.utime(os.path.basename(filename), dir_fd=dirfd,
444 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100445 finally:
446 os.close(dirfd)
447 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100448
Serhiy Storchaka79080682013-11-03 21:31:18 +0200449 # Restrict tests to Win32, since there is no guarantee other
Thomas Wouters89f507f2006-12-13 04:49:30 +0000450 # systems support centiseconds
Serhiy Storchaka79080682013-11-03 21:31:18 +0200451 def get_file_system(path):
452 if sys.platform == 'win32':
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000453 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000454 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000455 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000456 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000457 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000458 return buf.value
459
Serhiy Storchaka79080682013-11-03 21:31:18 +0200460 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
461 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
462 "requires NTFS")
463 def test_1565150(self):
464 t1 = 1159195039.25
465 os.utime(self.fname, (t1, t1))
466 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000467
Serhiy Storchaka79080682013-11-03 21:31:18 +0200468 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
469 @unittest.skipUnless(get_file_system(support.TESTFN) == "NTFS",
470 "requires NTFS")
471 def test_large_time(self):
472 t1 = 5000000000 # some day in 2128
473 os.utime(self.fname, (t1, t1))
474 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000475
Serhiy Storchaka79080682013-11-03 21:31:18 +0200476 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
477 def test_1686475(self):
478 # Verify that an open file can be stat'ed
479 try:
480 os.stat(r"c:\pagefile.sys")
481 except WindowsError as e:
482 if e.errno == 2: # file does not exist; cannot run test
Zachary Ware9fe6d862013-12-08 00:20:35 -0600483 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka79080682013-11-03 21:31:18 +0200484 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000485
Serhiy Storchaka79080682013-11-03 21:31:18 +0200486 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
487 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
488 def test_15261(self):
489 # Verify that stat'ing a closed fd does not cause crash
490 r, w = os.pipe()
491 try:
492 os.stat(r) # should not raise error
493 finally:
494 os.close(r)
495 os.close(w)
496 with self.assertRaises(OSError) as ctx:
497 os.stat(r)
498 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100499
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000500from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000501
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000502class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000503 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000504 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000505
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000506 def setUp(self):
507 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000508 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000509 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000510 for key, value in self._reference().items():
511 os.environ[key] = value
512
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000513 def tearDown(self):
514 os.environ.clear()
515 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000516 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000517 os.environb.clear()
518 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000519
Christian Heimes90333392007-11-01 19:08:42 +0000520 def _reference(self):
521 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
522
523 def _empty_mapping(self):
524 os.environ.clear()
525 return os.environ
526
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000527 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300528 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000529 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000530 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300531 os.environ.update(HELLO="World")
532 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
533 value = popen.read().strip()
534 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000535
Ezio Melottic7e139b2012-09-26 20:01:34 +0300536 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000537 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300538 with os.popen(
539 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
540 it = iter(popen)
541 self.assertEqual(next(it), "line1\n")
542 self.assertEqual(next(it), "line2\n")
543 self.assertEqual(next(it), "line3\n")
544 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000545
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000546 # Verify environ keys and values from the OS are of the
547 # correct str type.
548 def test_keyvalue_types(self):
549 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000550 self.assertEqual(type(key), str)
551 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000552
Christian Heimes90333392007-11-01 19:08:42 +0000553 def test_items(self):
554 for key, value in self._reference().items():
555 self.assertEqual(os.environ.get(key), value)
556
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000557 # Issue 7310
558 def test___repr__(self):
559 """Check that the repr() of os.environ looks like environ({...})."""
560 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000561 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
562 '{!r}: {!r}'.format(key, value)
563 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000564
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000565 def test_get_exec_path(self):
566 defpath_list = os.defpath.split(os.pathsep)
567 test_path = ['/monty', '/python', '', '/flying/circus']
568 test_env = {'PATH': os.pathsep.join(test_path)}
569
570 saved_environ = os.environ
571 try:
572 os.environ = dict(test_env)
573 # Test that defaulting to os.environ works.
574 self.assertSequenceEqual(test_path, os.get_exec_path())
575 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
576 finally:
577 os.environ = saved_environ
578
579 # No PATH environment variable
580 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
581 # Empty PATH environment variable
582 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
583 # Supplied PATH environment variable
584 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
585
Victor Stinnerb745a742010-05-18 17:17:23 +0000586 if os.supports_bytes_environ:
587 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000588 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000589 # ignore BytesWarning warning
590 with warnings.catch_warnings(record=True):
591 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000592 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000593 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000594 pass
595 else:
596 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000597
598 # bytes key and/or value
599 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
600 ['abc'])
601 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
602 ['abc'])
603 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
604 ['abc'])
605
606 @unittest.skipUnless(os.supports_bytes_environ,
607 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000608 def test_environb(self):
609 # os.environ -> os.environb
610 value = 'euro\u20ac'
611 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000612 value_bytes = value.encode(sys.getfilesystemencoding(),
613 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000614 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000615 msg = "U+20AC character is not encodable to %s" % (
616 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000617 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000618 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000619 self.assertEqual(os.environ['unicode'], value)
620 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000621
622 # os.environb -> os.environ
623 value = b'\xff'
624 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000625 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000626 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000627 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000628
Charles-François Natali2966f102011-11-26 11:32:46 +0100629 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
630 # #13415).
631 @support.requires_freebsd_version(7)
632 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100633 def test_unset_error(self):
634 if sys.platform == "win32":
635 # an environment variable is limited to 32,767 characters
636 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100637 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100638 else:
639 # "=" is not allowed in a variable name
640 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100641 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100642
Victor Stinner6d101392013-04-14 16:35:04 +0200643 def test_key_type(self):
644 missing = 'missingkey'
645 self.assertNotIn(missing, os.environ)
646
Victor Stinner839e5ea2013-04-14 16:43:03 +0200647 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200648 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200649 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200650 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200651
Victor Stinner839e5ea2013-04-14 16:43:03 +0200652 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200653 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200654 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200655 self.assertTrue(cm.exception.__suppress_context__)
656
Victor Stinner6d101392013-04-14 16:35:04 +0200657
Tim Petersc4e09402003-04-25 07:11:48 +0000658class WalkTests(unittest.TestCase):
659 """Tests for os.walk()."""
660
Charles-François Natali7372b062012-02-05 15:15:38 +0100661 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000662 import os
663 from os.path import join
664
665 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000666 # TESTFN/
667 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000668 # tmp1
669 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000670 # tmp2
671 # SUB11/ no kids
672 # SUB2/ a file kid and a dirsymlink kid
673 # tmp3
674 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200675 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000676 # TEST2/
677 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000678 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000679 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000680 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000681 sub2_path = join(walk_path, "SUB2")
682 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000683 tmp2_path = join(sub1_path, "tmp2")
684 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000685 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000686 t2_path = join(support.TESTFN, "TEST2")
687 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200688 link_path = join(sub2_path, "link")
689 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000690
691 # Create stuff.
692 os.makedirs(sub11_path)
693 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000694 os.makedirs(t2_path)
695 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000696 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000697 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
698 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000699 if support.can_symlink():
Jason R. Coombs3a092862013-05-27 23:21:28 -0400700 os.symlink(os.path.abspath(t2_path), link_path)
Jason R. Coombsb501b562013-05-27 23:52:43 -0400701 os.symlink('broken', broken_link_path, True)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200702 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000703 else:
704 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000705
706 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000707 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000708 self.assertEqual(len(all), 4)
709 # We can't know which order SUB1 and SUB2 will appear in.
710 # Not flipped: TESTFN, SUB1, SUB11, SUB2
711 # flipped: TESTFN, SUB2, SUB1, SUB11
712 flipped = all[0][1][0] != "SUB1"
713 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200714 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000715 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000716 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
717 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000718 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000719
720 # Prune the search.
721 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000722 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000723 all.append((root, dirs, files))
724 # Don't descend into SUB1.
725 if 'SUB1' in dirs:
726 # Note that this also mutates the dirs we appended to all!
727 dirs.remove('SUB1')
728 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000729 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200730 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000731 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000732
733 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000734 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000735 self.assertEqual(len(all), 4)
736 # We can't know which order SUB1 and SUB2 will appear in.
737 # Not flipped: SUB11, SUB1, SUB2, TESTFN
738 # flipped: SUB2, SUB11, SUB1, TESTFN
739 flipped = all[3][1][0] != "SUB1"
740 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200741 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000742 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000743 self.assertEqual(all[flipped], (sub11_path, [], []))
744 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000745 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000746
Brian Curtin3b4499c2010-12-28 14:31:47 +0000747 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000748 # Walk, following symlinks.
749 for root, dirs, files in os.walk(walk_path, followlinks=True):
750 if root == link_path:
751 self.assertEqual(dirs, [])
752 self.assertEqual(files, ["tmp4"])
753 break
754 else:
755 self.fail("Didn't follow symlink with followlinks=True")
756
757 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000758 # Tear everything down. This is a decent use for bottom-up on
759 # Windows, which doesn't have a recursive delete command. The
760 # (not so) subtlety is that rmdir will fail unless the dir's
761 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000762 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000763 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000764 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000765 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000766 dirname = os.path.join(root, name)
767 if not os.path.islink(dirname):
768 os.rmdir(dirname)
769 else:
770 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000771 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000772
Charles-François Natali7372b062012-02-05 15:15:38 +0100773
774@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
775class FwalkTests(WalkTests):
776 """Tests for os.fwalk()."""
777
Larry Hastingsc48fe982012-06-25 04:49:05 -0700778 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
779 """
780 compare with walk() results.
781 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700782 walk_kwargs = walk_kwargs.copy()
783 fwalk_kwargs = fwalk_kwargs.copy()
784 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
785 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
786 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700787
Charles-François Natali7372b062012-02-05 15:15:38 +0100788 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700789 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100790 expected[root] = (set(dirs), set(files))
791
Larry Hastingsc48fe982012-06-25 04:49:05 -0700792 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100793 self.assertIn(root, expected)
794 self.assertEqual(expected[root], (set(dirs), set(files)))
795
Larry Hastingsc48fe982012-06-25 04:49:05 -0700796 def test_compare_to_walk(self):
797 kwargs = {'top': support.TESTFN}
798 self._compare_to_walk(kwargs, kwargs)
799
Charles-François Natali7372b062012-02-05 15:15:38 +0100800 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700801 try:
802 fd = os.open(".", os.O_RDONLY)
803 walk_kwargs = {'top': support.TESTFN}
804 fwalk_kwargs = walk_kwargs.copy()
805 fwalk_kwargs['dir_fd'] = fd
806 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
807 finally:
808 os.close(fd)
809
810 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100811 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700812 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
813 args = support.TESTFN, topdown, None
814 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100815 # check that the FD is valid
816 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700817 # redundant check
818 os.stat(rootfd)
819 # check that listdir() returns consistent information
820 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100821
822 def test_fd_leak(self):
823 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
824 # we both check that calling fwalk() a large number of times doesn't
825 # yield EMFILE, and that the minimum allocated FD hasn't changed.
826 minfd = os.dup(1)
827 os.close(minfd)
828 for i in range(256):
829 for x in os.fwalk(support.TESTFN):
830 pass
831 newfd = os.dup(1)
832 self.addCleanup(os.close, newfd)
833 self.assertEqual(newfd, minfd)
834
835 def tearDown(self):
836 # cleanup
837 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
838 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700839 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100840 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700841 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700842 if stat.S_ISDIR(st.st_mode):
843 os.rmdir(name, dir_fd=rootfd)
844 else:
845 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100846 os.rmdir(support.TESTFN)
847
848
Guido van Rossume7ba4952007-06-06 23:52:48 +0000849class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000850 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000851 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000852
853 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000854 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000855 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
856 os.makedirs(path) # Should work
857 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
858 os.makedirs(path)
859
860 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000861 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000862 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
863 os.makedirs(path)
864 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
865 'dir5', 'dir6')
866 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000867
Terry Reedy5a22b652010-12-02 07:05:56 +0000868 def test_exist_ok_existing_directory(self):
869 path = os.path.join(support.TESTFN, 'dir1')
870 mode = 0o777
871 old_mask = os.umask(0o022)
872 os.makedirs(path, mode)
873 self.assertRaises(OSError, os.makedirs, path, mode)
874 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
875 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
876 os.makedirs(path, mode=mode, exist_ok=True)
877 os.umask(old_mask)
878
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700879 def test_exist_ok_s_isgid_directory(self):
880 path = os.path.join(support.TESTFN, 'dir1')
881 S_ISGID = stat.S_ISGID
882 mode = 0o777
883 old_mask = os.umask(0o022)
884 try:
885 existing_testfn_mode = stat.S_IMODE(
886 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -0700887 try:
888 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -0700889 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -0700890 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700891 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
892 raise unittest.SkipTest('No support for S_ISGID dir mode.')
893 # The os should apply S_ISGID from the parent dir for us, but
894 # this test need not depend on that behavior. Be explicit.
895 os.makedirs(path, mode | S_ISGID)
896 # http://bugs.python.org/issue14992
897 # Should not fail when the bit is already set.
898 os.makedirs(path, mode, exist_ok=True)
899 # remove the bit.
900 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
901 with self.assertRaises(OSError):
902 # Should fail when the bit is not already set when demanded.
903 os.makedirs(path, mode | S_ISGID, exist_ok=True)
904 finally:
905 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000906
907 def test_exist_ok_existing_regular_file(self):
908 base = support.TESTFN
909 path = os.path.join(support.TESTFN, 'dir1')
910 f = open(path, 'w')
911 f.write('abc')
912 f.close()
913 self.assertRaises(OSError, os.makedirs, path)
914 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
915 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
916 os.remove(path)
917
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000918 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000919 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000920 'dir4', 'dir5', 'dir6')
921 # If the tests failed, the bottom-most directory ('../dir6')
922 # may not have been created, so we look for the outermost directory
923 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000924 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000925 path = os.path.dirname(path)
926
927 os.removedirs(path)
928
Andrew Svetlov405faed2012-12-25 12:18:09 +0200929
930class RemoveDirsTests(unittest.TestCase):
931 def setUp(self):
932 os.makedirs(support.TESTFN)
933
934 def tearDown(self):
935 support.rmtree(support.TESTFN)
936
937 def test_remove_all(self):
938 dira = os.path.join(support.TESTFN, 'dira')
939 os.mkdir(dira)
940 dirb = os.path.join(dira, 'dirb')
941 os.mkdir(dirb)
942 os.removedirs(dirb)
943 self.assertFalse(os.path.exists(dirb))
944 self.assertFalse(os.path.exists(dira))
945 self.assertFalse(os.path.exists(support.TESTFN))
946
947 def test_remove_partial(self):
948 dira = os.path.join(support.TESTFN, 'dira')
949 os.mkdir(dira)
950 dirb = os.path.join(dira, 'dirb')
951 os.mkdir(dirb)
952 with open(os.path.join(dira, 'file.txt'), 'w') as f:
953 f.write('text')
954 os.removedirs(dirb)
955 self.assertFalse(os.path.exists(dirb))
956 self.assertTrue(os.path.exists(dira))
957 self.assertTrue(os.path.exists(support.TESTFN))
958
959 def test_remove_nothing(self):
960 dira = os.path.join(support.TESTFN, 'dira')
961 os.mkdir(dira)
962 dirb = os.path.join(dira, 'dirb')
963 os.mkdir(dirb)
964 with open(os.path.join(dirb, 'file.txt'), 'w') as f:
965 f.write('text')
966 with self.assertRaises(OSError):
967 os.removedirs(dirb)
968 self.assertTrue(os.path.exists(dirb))
969 self.assertTrue(os.path.exists(dira))
970 self.assertTrue(os.path.exists(support.TESTFN))
971
972
Guido van Rossume7ba4952007-06-06 23:52:48 +0000973class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000974 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200975 with open(os.devnull, 'wb') as f:
976 f.write(b'hello')
977 f.close()
978 with open(os.devnull, 'rb') as f:
979 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000980
Andrew Svetlov405faed2012-12-25 12:18:09 +0200981
Guido van Rossume7ba4952007-06-06 23:52:48 +0000982class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100983 def test_urandom_length(self):
984 self.assertEqual(len(os.urandom(0)), 0)
985 self.assertEqual(len(os.urandom(1)), 1)
986 self.assertEqual(len(os.urandom(10)), 10)
987 self.assertEqual(len(os.urandom(100)), 100)
988 self.assertEqual(len(os.urandom(1000)), 1000)
989
990 def test_urandom_value(self):
991 data1 = os.urandom(16)
992 data2 = os.urandom(16)
993 self.assertNotEqual(data1, data2)
994
995 def get_urandom_subprocess(self, count):
996 code = '\n'.join((
997 'import os, sys',
998 'data = os.urandom(%s)' % count,
999 'sys.stdout.buffer.write(data)',
1000 'sys.stdout.buffer.flush()'))
1001 out = assert_python_ok('-c', code)
1002 stdout = out[1]
1003 self.assertEqual(len(stdout), 16)
1004 return stdout
1005
1006 def test_urandom_subprocess(self):
1007 data1 = self.get_urandom_subprocess(16)
1008 data2 = self.get_urandom_subprocess(16)
1009 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001010
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001011 @unittest.skipUnless(resource, "test requires the resource module")
1012 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001013 # Check urandom() failing when it is not able to open /dev/random.
1014 # We spawn a new process to make the test more robust (if getrlimit()
1015 # failed to restore the file descriptor limit after this, the whole
1016 # test suite would crash; this actually happened on the OS X Tiger
1017 # buildbot).
1018 code = """if 1:
1019 import errno
1020 import os
1021 import resource
1022
1023 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1024 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1025 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001026 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001027 except OSError as e:
1028 assert e.errno == errno.EMFILE, e.errno
1029 else:
1030 raise AssertionError("OSError not raised")
1031 """
1032 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001033
1034
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001035@contextlib.contextmanager
1036def _execvpe_mockup(defpath=None):
1037 """
1038 Stubs out execv and execve functions when used as context manager.
1039 Records exec calls. The mock execv and execve functions always raise an
1040 exception as they would normally never return.
1041 """
1042 # A list of tuples containing (function name, first arg, args)
1043 # of calls to execv or execve that have been made.
1044 calls = []
1045
1046 def mock_execv(name, *args):
1047 calls.append(('execv', name, args))
1048 raise RuntimeError("execv called")
1049
1050 def mock_execve(name, *args):
1051 calls.append(('execve', name, args))
1052 raise OSError(errno.ENOTDIR, "execve called")
1053
1054 try:
1055 orig_execv = os.execv
1056 orig_execve = os.execve
1057 orig_defpath = os.defpath
1058 os.execv = mock_execv
1059 os.execve = mock_execve
1060 if defpath is not None:
1061 os.defpath = defpath
1062 yield calls
1063 finally:
1064 os.execv = orig_execv
1065 os.execve = orig_execve
1066 os.defpath = orig_defpath
1067
Guido van Rossume7ba4952007-06-06 23:52:48 +00001068class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001069 @unittest.skipIf(USING_LINUXTHREADS,
1070 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001071 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001072 self.assertRaises(OSError, os.execvpe, 'no such app-',
1073 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001074
Thomas Heller6790d602007-08-30 17:15:14 +00001075 def test_execvpe_with_bad_arglist(self):
1076 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1077
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001078 @unittest.skipUnless(hasattr(os, '_execvpe'),
1079 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001080 def _test_internal_execvpe(self, test_type):
1081 program_path = os.sep + 'absolutepath'
1082 if test_type is bytes:
1083 program = b'executable'
1084 fullpath = os.path.join(os.fsencode(program_path), program)
1085 native_fullpath = fullpath
1086 arguments = [b'progname', 'arg1', 'arg2']
1087 else:
1088 program = 'executable'
1089 arguments = ['progname', 'arg1', 'arg2']
1090 fullpath = os.path.join(program_path, program)
1091 if os.name != "nt":
1092 native_fullpath = os.fsencode(fullpath)
1093 else:
1094 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001095 env = {'spam': 'beans'}
1096
Victor Stinnerb745a742010-05-18 17:17:23 +00001097 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001098 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001099 self.assertRaises(RuntimeError,
1100 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001101 self.assertEqual(len(calls), 1)
1102 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1103
Victor Stinnerb745a742010-05-18 17:17:23 +00001104 # test os._execvpe() with a relative path:
1105 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001106 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001107 self.assertRaises(OSError,
1108 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001109 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001110 self.assertSequenceEqual(calls[0],
1111 ('execve', native_fullpath, (arguments, env)))
1112
1113 # test os._execvpe() with a relative path:
1114 # os.get_exec_path() reads the 'PATH' variable
1115 with _execvpe_mockup() as calls:
1116 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001117 if test_type is bytes:
1118 env_path[b'PATH'] = program_path
1119 else:
1120 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001121 self.assertRaises(OSError,
1122 os._execvpe, program, arguments, env=env_path)
1123 self.assertEqual(len(calls), 1)
1124 self.assertSequenceEqual(calls[0],
1125 ('execve', native_fullpath, (arguments, env_path)))
1126
1127 def test_internal_execvpe_str(self):
1128 self._test_internal_execvpe(str)
1129 if os.name != "nt":
1130 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001131
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001132
Serhiy Storchaka79080682013-11-03 21:31:18 +02001133@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001134class Win32ErrorTests(unittest.TestCase):
1135 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001136 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001137
1138 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001139 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001140
1141 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001142 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001143
1144 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001145 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001146 try:
1147 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
1148 finally:
1149 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001150 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001151
1152 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001153 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001154
Thomas Wouters477c8d52006-05-27 19:21:47 +00001155 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +00001156 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001157
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001158class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001159 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001160 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1161 #singles.append("close")
1162 #We omit close because it doesn'r raise an exception on some platforms
1163 def get_single(f):
1164 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001165 if hasattr(os, f):
1166 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001167 return helper
1168 for f in singles:
1169 locals()["test_"+f] = get_single(f)
1170
Benjamin Peterson7522c742009-01-19 21:00:09 +00001171 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001172 try:
1173 f(support.make_bad_fd(), *args)
1174 except OSError as e:
1175 self.assertEqual(e.errno, errno.EBADF)
1176 else:
1177 self.fail("%r didn't raise a OSError with a bad file descriptor"
1178 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001179
Serhiy Storchaka79080682013-11-03 21:31:18 +02001180 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001181 def test_isatty(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001182 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001183
Serhiy Storchaka79080682013-11-03 21:31:18 +02001184 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001185 def test_closerange(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001186 fd = support.make_bad_fd()
1187 # Make sure none of the descriptors we are about to close are
1188 # currently valid (issue 6542).
1189 for i in range(10):
1190 try: os.fstat(fd+i)
1191 except OSError:
1192 pass
1193 else:
1194 break
1195 if i < 2:
1196 raise unittest.SkipTest(
1197 "Unable to acquire a range of invalid file descriptors")
1198 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001199
Serhiy Storchaka79080682013-11-03 21:31:18 +02001200 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001201 def test_dup2(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001202 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001203
Serhiy Storchaka79080682013-11-03 21:31:18 +02001204 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001205 def test_fchmod(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001206 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001207
Serhiy Storchaka79080682013-11-03 21:31:18 +02001208 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001209 def test_fchown(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001210 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001211
Serhiy Storchaka79080682013-11-03 21:31:18 +02001212 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001213 def test_fpathconf(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001214 self.check(os.pathconf, "PC_NAME_MAX")
1215 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001216
Serhiy Storchaka79080682013-11-03 21:31:18 +02001217 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001218 def test_ftruncate(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001219 self.check(os.truncate, 0)
1220 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001221
Serhiy Storchaka79080682013-11-03 21:31:18 +02001222 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001223 def test_lseek(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001224 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001225
Serhiy Storchaka79080682013-11-03 21:31:18 +02001226 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001227 def test_read(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001228 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001229
Victor Stinner57ddf782014-01-08 15:21:28 +01001230 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1231 def test_readv(self):
1232 buf = bytearray(10)
1233 self.check(os.readv, [buf])
1234
Serhiy Storchaka79080682013-11-03 21:31:18 +02001235 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001236 def test_tcsetpgrpt(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001237 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001238
Serhiy Storchaka79080682013-11-03 21:31:18 +02001239 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001240 def test_write(self):
Serhiy Storchaka79080682013-11-03 21:31:18 +02001241 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001242
Victor Stinner57ddf782014-01-08 15:21:28 +01001243 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1244 def test_writev(self):
1245 self.check(os.writev, [b'abc'])
1246
Brian Curtin1b9df392010-11-24 20:24:31 +00001247
1248class LinkTests(unittest.TestCase):
1249 def setUp(self):
1250 self.file1 = support.TESTFN
1251 self.file2 = os.path.join(support.TESTFN + "2")
1252
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001253 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001254 for file in (self.file1, self.file2):
1255 if os.path.exists(file):
1256 os.unlink(file)
1257
Brian Curtin1b9df392010-11-24 20:24:31 +00001258 def _test_link(self, file1, file2):
1259 with open(file1, "w") as f1:
1260 f1.write("test")
1261
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001262 with warnings.catch_warnings():
1263 warnings.simplefilter("ignore", DeprecationWarning)
1264 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001265 with open(file1, "r") as f1, open(file2, "r") as f2:
1266 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1267
1268 def test_link(self):
1269 self._test_link(self.file1, self.file2)
1270
1271 def test_link_bytes(self):
1272 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1273 bytes(self.file2, sys.getfilesystemencoding()))
1274
Brian Curtinf498b752010-11-30 15:54:04 +00001275 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001276 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001277 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001278 except UnicodeError:
1279 raise unittest.SkipTest("Unable to encode for this platform.")
1280
Brian Curtinf498b752010-11-30 15:54:04 +00001281 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001282 self.file2 = self.file1 + "2"
1283 self._test_link(self.file1, self.file2)
1284
Serhiy Storchaka79080682013-11-03 21:31:18 +02001285@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1286class PosixUidGidTests(unittest.TestCase):
1287 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1288 def test_setuid(self):
1289 if os.getuid() != 0:
1290 self.assertRaises(os.error, os.setuid, 0)
1291 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001292
Serhiy Storchaka79080682013-11-03 21:31:18 +02001293 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1294 def test_setgid(self):
1295 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1296 self.assertRaises(os.error, os.setgid, 0)
1297 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001298
Serhiy Storchaka79080682013-11-03 21:31:18 +02001299 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1300 def test_seteuid(self):
1301 if os.getuid() != 0:
1302 self.assertRaises(os.error, os.seteuid, 0)
1303 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001304
Serhiy Storchaka79080682013-11-03 21:31:18 +02001305 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1306 def test_setegid(self):
1307 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1308 self.assertRaises(os.error, os.setegid, 0)
1309 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001310
Serhiy Storchaka79080682013-11-03 21:31:18 +02001311 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1312 def test_setreuid(self):
1313 if os.getuid() != 0:
1314 self.assertRaises(os.error, os.setreuid, 0, 0)
1315 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1316 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001317
Serhiy Storchaka79080682013-11-03 21:31:18 +02001318 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1319 def test_setregid(self):
1320 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1321 self.assertRaises(os.error, os.setregid, 0, 0)
1322 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1323 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001324
Serhiy Storchaka79080682013-11-03 21:31:18 +02001325@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1326class Pep383Tests(unittest.TestCase):
1327 def setUp(self):
1328 if support.TESTFN_UNENCODABLE:
1329 self.dir = support.TESTFN_UNENCODABLE
1330 elif support.TESTFN_NONASCII:
1331 self.dir = support.TESTFN_NONASCII
1332 else:
1333 self.dir = support.TESTFN
1334 self.bdir = os.fsencode(self.dir)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001335
Serhiy Storchaka79080682013-11-03 21:31:18 +02001336 bytesfn = []
1337 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001338 try:
Serhiy Storchaka79080682013-11-03 21:31:18 +02001339 fn = os.fsencode(fn)
1340 except UnicodeEncodeError:
1341 return
1342 bytesfn.append(fn)
1343 add_filename(support.TESTFN_UNICODE)
1344 if support.TESTFN_UNENCODABLE:
1345 add_filename(support.TESTFN_UNENCODABLE)
1346 if support.TESTFN_NONASCII:
1347 add_filename(support.TESTFN_NONASCII)
1348 if not bytesfn:
1349 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001350
Serhiy Storchaka79080682013-11-03 21:31:18 +02001351 self.unicodefn = set()
1352 os.mkdir(self.dir)
1353 try:
1354 for fn in bytesfn:
1355 support.create_empty_file(os.path.join(self.bdir, fn))
1356 fn = os.fsdecode(fn)
1357 if fn in self.unicodefn:
1358 raise ValueError("duplicate filename")
1359 self.unicodefn.add(fn)
1360 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001361 shutil.rmtree(self.dir)
Serhiy Storchaka79080682013-11-03 21:31:18 +02001362 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001363
Serhiy Storchaka79080682013-11-03 21:31:18 +02001364 def tearDown(self):
1365 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001366
Serhiy Storchaka79080682013-11-03 21:31:18 +02001367 def test_listdir(self):
1368 expected = self.unicodefn
1369 found = set(os.listdir(self.dir))
1370 self.assertEqual(found, expected)
1371 # test listdir without arguments
1372 current_directory = os.getcwd()
1373 try:
1374 os.chdir(os.sep)
1375 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1376 finally:
1377 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001378
Serhiy Storchaka79080682013-11-03 21:31:18 +02001379 def test_open(self):
1380 for fn in self.unicodefn:
1381 f = open(os.path.join(self.dir, fn), 'rb')
1382 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001383
Serhiy Storchaka79080682013-11-03 21:31:18 +02001384 @unittest.skipUnless(hasattr(os, 'statvfs'),
1385 "need os.statvfs()")
1386 def test_statvfs(self):
1387 # issue #9645
1388 for fn in self.unicodefn:
1389 # should not fail with file not found error
1390 fullname = os.path.join(self.dir, fn)
1391 os.statvfs(fullname)
1392
1393 def test_stat(self):
1394 for fn in self.unicodefn:
1395 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001396
Brian Curtineb24d742010-04-12 17:16:38 +00001397@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1398class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001399 def _kill(self, sig):
1400 # Start sys.executable as a subprocess and communicate from the
1401 # subprocess to the parent that the interpreter is ready. When it
1402 # becomes ready, send *sig* via os.kill to the subprocess and check
1403 # that the return code is equal to *sig*.
1404 import ctypes
1405 from ctypes import wintypes
1406 import msvcrt
1407
1408 # Since we can't access the contents of the process' stdout until the
1409 # process has exited, use PeekNamedPipe to see what's inside stdout
1410 # without waiting. This is done so we can tell that the interpreter
1411 # is started and running at a point where it could handle a signal.
1412 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1413 PeekNamedPipe.restype = wintypes.BOOL
1414 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1415 ctypes.POINTER(ctypes.c_char), # stdout buf
1416 wintypes.DWORD, # Buffer size
1417 ctypes.POINTER(wintypes.DWORD), # bytes read
1418 ctypes.POINTER(wintypes.DWORD), # bytes avail
1419 ctypes.POINTER(wintypes.DWORD)) # bytes left
1420 msg = "running"
1421 proc = subprocess.Popen([sys.executable, "-c",
1422 "import sys;"
1423 "sys.stdout.write('{}');"
1424 "sys.stdout.flush();"
1425 "input()".format(msg)],
1426 stdout=subprocess.PIPE,
1427 stderr=subprocess.PIPE,
1428 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001429 self.addCleanup(proc.stdout.close)
1430 self.addCleanup(proc.stderr.close)
1431 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001432
1433 count, max = 0, 100
1434 while count < max and proc.poll() is None:
1435 # Create a string buffer to store the result of stdout from the pipe
1436 buf = ctypes.create_string_buffer(len(msg))
1437 # Obtain the text currently in proc.stdout
1438 # Bytes read/avail/left are left as NULL and unused
1439 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1440 buf, ctypes.sizeof(buf), None, None, None)
1441 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1442 if buf.value:
1443 self.assertEqual(msg, buf.value.decode())
1444 break
1445 time.sleep(0.1)
1446 count += 1
1447 else:
1448 self.fail("Did not receive communication from the subprocess")
1449
Brian Curtineb24d742010-04-12 17:16:38 +00001450 os.kill(proc.pid, sig)
1451 self.assertEqual(proc.wait(), sig)
1452
1453 def test_kill_sigterm(self):
1454 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001455 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001456
1457 def test_kill_int(self):
1458 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001459 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001460
1461 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001462 tagname = "test_os_%s" % uuid.uuid1()
1463 m = mmap.mmap(-1, 1, tagname)
1464 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001465 # Run a script which has console control handling enabled.
1466 proc = subprocess.Popen([sys.executable,
1467 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001468 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001469 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1470 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001471 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001472 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001473 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001474 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001475 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001476 count += 1
1477 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001478 # Forcefully kill the process if we weren't able to signal it.
1479 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001480 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001481 os.kill(proc.pid, event)
1482 # proc.send_signal(event) could also be done here.
1483 # Allow time for the signal to be passed and the process to exit.
1484 time.sleep(0.5)
1485 if not proc.poll():
1486 # Forcefully kill the process if we weren't able to signal it.
1487 os.kill(proc.pid, signal.SIGINT)
1488 self.fail("subprocess did not stop on {}".format(name))
1489
1490 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1491 def test_CTRL_C_EVENT(self):
1492 from ctypes import wintypes
1493 import ctypes
1494
1495 # Make a NULL value by creating a pointer with no argument.
1496 NULL = ctypes.POINTER(ctypes.c_int)()
1497 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1498 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1499 wintypes.BOOL)
1500 SetConsoleCtrlHandler.restype = wintypes.BOOL
1501
1502 # Calling this with NULL and FALSE causes the calling process to
1503 # handle CTRL+C, rather than ignore it. This property is inherited
1504 # by subprocesses.
1505 SetConsoleCtrlHandler(NULL, 0)
1506
1507 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1508
1509 def test_CTRL_BREAK_EVENT(self):
1510 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1511
1512
Brian Curtind40e6f72010-07-08 21:39:08 +00001513@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001514@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001515class Win32SymlinkTests(unittest.TestCase):
1516 filelink = 'filelinktest'
1517 filelink_target = os.path.abspath(__file__)
1518 dirlink = 'dirlinktest'
1519 dirlink_target = os.path.dirname(filelink_target)
1520 missing_link = 'missing link'
1521
1522 def setUp(self):
1523 assert os.path.exists(self.dirlink_target)
1524 assert os.path.exists(self.filelink_target)
1525 assert not os.path.exists(self.dirlink)
1526 assert not os.path.exists(self.filelink)
1527 assert not os.path.exists(self.missing_link)
1528
1529 def tearDown(self):
1530 if os.path.exists(self.filelink):
1531 os.remove(self.filelink)
1532 if os.path.exists(self.dirlink):
1533 os.rmdir(self.dirlink)
1534 if os.path.lexists(self.missing_link):
1535 os.remove(self.missing_link)
1536
1537 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001538 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001539 self.assertTrue(os.path.exists(self.dirlink))
1540 self.assertTrue(os.path.isdir(self.dirlink))
1541 self.assertTrue(os.path.islink(self.dirlink))
1542 self.check_stat(self.dirlink, self.dirlink_target)
1543
1544 def test_file_link(self):
1545 os.symlink(self.filelink_target, self.filelink)
1546 self.assertTrue(os.path.exists(self.filelink))
1547 self.assertTrue(os.path.isfile(self.filelink))
1548 self.assertTrue(os.path.islink(self.filelink))
1549 self.check_stat(self.filelink, self.filelink_target)
1550
1551 def _create_missing_dir_link(self):
1552 'Create a "directory" link to a non-existent target'
1553 linkname = self.missing_link
1554 if os.path.lexists(linkname):
1555 os.remove(linkname)
1556 target = r'c:\\target does not exist.29r3c740'
1557 assert not os.path.exists(target)
1558 target_is_dir = True
1559 os.symlink(target, linkname, target_is_dir)
1560
1561 def test_remove_directory_link_to_missing_target(self):
1562 self._create_missing_dir_link()
1563 # For compatibility with Unix, os.remove will check the
1564 # directory status and call RemoveDirectory if the symlink
1565 # was created with target_is_dir==True.
1566 os.remove(self.missing_link)
1567
1568 @unittest.skip("currently fails; consider for improvement")
1569 def test_isdir_on_directory_link_to_missing_target(self):
1570 self._create_missing_dir_link()
1571 # consider having isdir return true for directory links
1572 self.assertTrue(os.path.isdir(self.missing_link))
1573
1574 @unittest.skip("currently fails; consider for improvement")
1575 def test_rmdir_on_directory_link_to_missing_target(self):
1576 self._create_missing_dir_link()
1577 # consider allowing rmdir to remove directory links
1578 os.rmdir(self.missing_link)
1579
1580 def check_stat(self, link, target):
1581 self.assertEqual(os.stat(link), os.stat(target))
1582 self.assertNotEqual(os.lstat(link), os.stat(link))
1583
Brian Curtind25aef52011-06-13 15:16:04 -05001584 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001585 with warnings.catch_warnings():
1586 warnings.simplefilter("ignore", DeprecationWarning)
1587 self.assertEqual(os.stat(bytes_link), os.stat(target))
1588 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001589
1590 def test_12084(self):
1591 level1 = os.path.abspath(support.TESTFN)
1592 level2 = os.path.join(level1, "level2")
1593 level3 = os.path.join(level2, "level3")
1594 try:
1595 os.mkdir(level1)
1596 os.mkdir(level2)
1597 os.mkdir(level3)
1598
1599 file1 = os.path.abspath(os.path.join(level1, "file1"))
1600
1601 with open(file1, "w") as f:
1602 f.write("file1")
1603
1604 orig_dir = os.getcwd()
1605 try:
1606 os.chdir(level2)
1607 link = os.path.join(level2, "link")
1608 os.symlink(os.path.relpath(file1), "link")
1609 self.assertIn("link", os.listdir(os.getcwd()))
1610
1611 # Check os.stat calls from the same dir as the link
1612 self.assertEqual(os.stat(file1), os.stat("link"))
1613
1614 # Check os.stat calls from a dir below the link
1615 os.chdir(level1)
1616 self.assertEqual(os.stat(file1),
1617 os.stat(os.path.relpath(link)))
1618
1619 # Check os.stat calls from a dir above the link
1620 os.chdir(level3)
1621 self.assertEqual(os.stat(file1),
1622 os.stat(os.path.relpath(link)))
1623 finally:
1624 os.chdir(orig_dir)
1625 except OSError as err:
1626 self.fail(err)
1627 finally:
1628 os.remove(file1)
1629 shutil.rmtree(level1)
1630
Brian Curtind40e6f72010-07-08 21:39:08 +00001631
Jason R. Coombs3a092862013-05-27 23:21:28 -04001632@support.skip_unless_symlink
1633class NonLocalSymlinkTests(unittest.TestCase):
1634
1635 def setUp(self):
1636 """
1637 Create this structure:
1638
1639 base
1640 \___ some_dir
1641 """
1642 os.makedirs('base/some_dir')
1643
1644 def tearDown(self):
1645 shutil.rmtree('base')
1646
1647 def test_directory_link_nonlocal(self):
1648 """
1649 The symlink target should resolve relative to the link, not relative
1650 to the current directory.
1651
1652 Then, link base/some_link -> base/some_dir and ensure that some_link
1653 is resolved as a directory.
1654
1655 In issue13772, it was discovered that directory detection failed if
1656 the symlink target was not specified relative to the current
1657 directory, which was a defect in the implementation.
1658 """
1659 src = os.path.join('base', 'some_link')
1660 os.symlink('some_dir', src)
1661 assert os.path.isdir(src)
1662
1663
Victor Stinnere8d51452010-08-19 01:05:19 +00001664class FSEncodingTests(unittest.TestCase):
1665 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001666 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1667 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001668
Victor Stinnere8d51452010-08-19 01:05:19 +00001669 def test_identity(self):
1670 # assert fsdecode(fsencode(x)) == x
1671 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1672 try:
1673 bytesfn = os.fsencode(fn)
1674 except UnicodeEncodeError:
1675 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001676 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001677
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001678
Brett Cannonefb00c02012-02-29 18:31:31 -05001679
1680class DeviceEncodingTests(unittest.TestCase):
1681
1682 def test_bad_fd(self):
1683 # Return None when an fd doesn't actually exist.
1684 self.assertIsNone(os.device_encoding(123456))
1685
Philip Jenveye308b7c2012-02-29 16:16:15 -08001686 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1687 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001688 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001689 def test_device_encoding(self):
1690 encoding = os.device_encoding(0)
1691 self.assertIsNotNone(encoding)
1692 self.assertTrue(codecs.lookup(encoding))
1693
1694
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001695class PidTests(unittest.TestCase):
1696 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1697 def test_getppid(self):
1698 p = subprocess.Popen([sys.executable, '-c',
1699 'import os; print(os.getppid())'],
1700 stdout=subprocess.PIPE)
1701 stdout, _ = p.communicate()
1702 # We are the parent of our subprocess
1703 self.assertEqual(int(stdout), os.getpid())
1704
1705
Brian Curtin0151b8e2010-09-24 13:43:43 +00001706# The introduction of this TestCase caused at least two different errors on
1707# *nix buildbots. Temporarily skip this to let the buildbots move along.
1708@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001709@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1710class LoginTests(unittest.TestCase):
1711 def test_getlogin(self):
1712 user_name = os.getlogin()
1713 self.assertNotEqual(len(user_name), 0)
1714
1715
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001716@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1717 "needs os.getpriority and os.setpriority")
1718class ProgramPriorityTests(unittest.TestCase):
1719 """Tests for os.getpriority() and os.setpriority()."""
1720
1721 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001722
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001723 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1724 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1725 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001726 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1727 if base >= 19 and new_prio <= 19:
1728 raise unittest.SkipTest(
1729 "unable to reliably test setpriority at current nice level of %s" % base)
1730 else:
1731 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001732 finally:
1733 try:
1734 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1735 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001736 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001737 raise
1738
1739
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001740if threading is not None:
1741 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001742
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001743 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001744
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001745 def __init__(self, conn):
1746 asynchat.async_chat.__init__(self, conn)
1747 self.in_buffer = []
1748 self.closed = False
1749 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001750
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001751 def handle_read(self):
1752 data = self.recv(4096)
1753 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001754
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001755 def get_data(self):
1756 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001757
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001758 def handle_close(self):
1759 self.close()
1760 self.closed = True
1761
1762 def handle_error(self):
1763 raise
1764
1765 def __init__(self, address):
1766 threading.Thread.__init__(self)
1767 asyncore.dispatcher.__init__(self)
1768 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1769 self.bind(address)
1770 self.listen(5)
1771 self.host, self.port = self.socket.getsockname()[:2]
1772 self.handler_instance = None
1773 self._active = False
1774 self._active_lock = threading.Lock()
1775
1776 # --- public API
1777
1778 @property
1779 def running(self):
1780 return self._active
1781
1782 def start(self):
1783 assert not self.running
1784 self.__flag = threading.Event()
1785 threading.Thread.start(self)
1786 self.__flag.wait()
1787
1788 def stop(self):
1789 assert self.running
1790 self._active = False
1791 self.join()
1792
1793 def wait(self):
1794 # wait for handler connection to be closed, then stop the server
1795 while not getattr(self.handler_instance, "closed", False):
1796 time.sleep(0.001)
1797 self.stop()
1798
1799 # --- internals
1800
1801 def run(self):
1802 self._active = True
1803 self.__flag.set()
1804 while self._active and asyncore.socket_map:
1805 self._active_lock.acquire()
1806 asyncore.loop(timeout=0.001, count=1)
1807 self._active_lock.release()
1808 asyncore.close_all()
1809
1810 def handle_accept(self):
1811 conn, addr = self.accept()
1812 self.handler_instance = self.Handler(conn)
1813
1814 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001815 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001816 handle_read = handle_connect
1817
1818 def writable(self):
1819 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001820
1821 def handle_error(self):
1822 raise
1823
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001824
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001825@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001826@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1827class TestSendfile(unittest.TestCase):
1828
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001829 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001830 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001831 not sys.platform.startswith("solaris") and \
1832 not sys.platform.startswith("sunos")
Serhiy Storchaka79080682013-11-03 21:31:18 +02001833 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
1834 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001835
1836 @classmethod
1837 def setUpClass(cls):
1838 with open(support.TESTFN, "wb") as f:
1839 f.write(cls.DATA)
1840
1841 @classmethod
1842 def tearDownClass(cls):
1843 support.unlink(support.TESTFN)
1844
1845 def setUp(self):
1846 self.server = SendfileTestServer((support.HOST, 0))
1847 self.server.start()
1848 self.client = socket.socket()
1849 self.client.connect((self.server.host, self.server.port))
1850 self.client.settimeout(1)
1851 # synchronize by waiting for "220 ready" response
1852 self.client.recv(1024)
1853 self.sockno = self.client.fileno()
1854 self.file = open(support.TESTFN, 'rb')
1855 self.fileno = self.file.fileno()
1856
1857 def tearDown(self):
1858 self.file.close()
1859 self.client.close()
1860 if self.server.running:
1861 self.server.stop()
1862
1863 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1864 """A higher level wrapper representing how an application is
1865 supposed to use sendfile().
1866 """
1867 while 1:
1868 try:
1869 if self.SUPPORT_HEADERS_TRAILERS:
1870 return os.sendfile(sock, file, offset, nbytes, headers,
1871 trailers)
1872 else:
1873 return os.sendfile(sock, file, offset, nbytes)
1874 except OSError as err:
1875 if err.errno == errno.ECONNRESET:
1876 # disconnected
1877 raise
1878 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1879 # we have to retry send data
1880 continue
1881 else:
1882 raise
1883
1884 def test_send_whole_file(self):
1885 # normal send
1886 total_sent = 0
1887 offset = 0
1888 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001889 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001890 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1891 if sent == 0:
1892 break
1893 offset += sent
1894 total_sent += sent
1895 self.assertTrue(sent <= nbytes)
1896 self.assertEqual(offset, total_sent)
1897
1898 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001899 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001900 self.client.close()
1901 self.server.wait()
1902 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001903 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001904 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001905
1906 def test_send_at_certain_offset(self):
1907 # start sending a file at a certain offset
1908 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001909 offset = len(self.DATA) // 2
1910 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001911 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001912 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001913 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1914 if sent == 0:
1915 break
1916 offset += sent
1917 total_sent += sent
1918 self.assertTrue(sent <= nbytes)
1919
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001920 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001921 self.client.close()
1922 self.server.wait()
1923 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001924 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001925 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001926 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001927 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001928
1929 def test_offset_overflow(self):
1930 # specify an offset > file size
1931 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001932 try:
1933 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1934 except OSError as e:
1935 # Solaris can raise EINVAL if offset >= file length, ignore.
1936 if e.errno != errno.EINVAL:
1937 raise
1938 else:
1939 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001940 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001941 self.client.close()
1942 self.server.wait()
1943 data = self.server.handler_instance.get_data()
1944 self.assertEqual(data, b'')
1945
1946 def test_invalid_offset(self):
1947 with self.assertRaises(OSError) as cm:
1948 os.sendfile(self.sockno, self.fileno, -1, 4096)
1949 self.assertEqual(cm.exception.errno, errno.EINVAL)
1950
1951 # --- headers / trailers tests
1952
Serhiy Storchaka79080682013-11-03 21:31:18 +02001953 @requires_headers_trailers
1954 def test_headers(self):
1955 total_sent = 0
1956 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1957 headers=[b"x" * 512])
1958 total_sent += sent
1959 offset = 4096
1960 nbytes = 4096
1961 while 1:
1962 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1963 offset, nbytes)
1964 if sent == 0:
1965 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001966 total_sent += sent
Serhiy Storchaka79080682013-11-03 21:31:18 +02001967 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001968
Serhiy Storchaka79080682013-11-03 21:31:18 +02001969 expected_data = b"x" * 512 + self.DATA
1970 self.assertEqual(total_sent, len(expected_data))
1971 self.client.close()
1972 self.server.wait()
1973 data = self.server.handler_instance.get_data()
1974 self.assertEqual(hash(data), hash(expected_data))
1975
1976 @requires_headers_trailers
1977 def test_trailers(self):
1978 TESTFN2 = support.TESTFN + "2"
1979 file_data = b"abcdef"
1980 with open(TESTFN2, 'wb') as f:
1981 f.write(file_data)
1982 with open(TESTFN2, 'rb')as f:
1983 self.addCleanup(os.remove, TESTFN2)
1984 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
1985 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001986 self.client.close()
1987 self.server.wait()
1988 data = self.server.handler_instance.get_data()
Serhiy Storchaka79080682013-11-03 21:31:18 +02001989 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001990
Serhiy Storchaka79080682013-11-03 21:31:18 +02001991 @requires_headers_trailers
1992 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
1993 'test needs os.SF_NODISKIO')
1994 def test_flags(self):
1995 try:
1996 os.sendfile(self.sockno, self.fileno, 0, 4096,
1997 flags=os.SF_NODISKIO)
1998 except OSError as err:
1999 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2000 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002001
2002
Larry Hastings9cf065c2012-06-22 16:30:09 -07002003def supports_extended_attributes():
2004 if not hasattr(os, "setxattr"):
2005 return False
2006 try:
2007 with open(support.TESTFN, "wb") as fp:
2008 try:
2009 os.setxattr(fp.fileno(), b"user.test", b"")
2010 except OSError:
2011 return False
2012 finally:
2013 support.unlink(support.TESTFN)
2014 # Kernels < 2.6.39 don't respect setxattr flags.
2015 kernel_version = platform.release()
2016 m = re.match("2.6.(\d{1,2})", kernel_version)
2017 return m is None or int(m.group(1)) >= 39
2018
2019
2020@unittest.skipUnless(supports_extended_attributes(),
2021 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002022class ExtendedAttributeTests(unittest.TestCase):
2023
2024 def tearDown(self):
2025 support.unlink(support.TESTFN)
2026
Larry Hastings9cf065c2012-06-22 16:30:09 -07002027 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002028 fn = support.TESTFN
2029 open(fn, "wb").close()
2030 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002031 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002032 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002033 init_xattr = listxattr(fn)
2034 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002035 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002036 xattr = set(init_xattr)
2037 xattr.add("user.test")
2038 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002039 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2040 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2041 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04002042 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002043 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002044 self.assertEqual(cm.exception.errno, errno.EEXIST)
2045 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002046 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002047 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002048 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002049 xattr.add("user.test2")
2050 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002051 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002052 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002053 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002054 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002055 xattr.remove("user.test")
2056 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002057 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2058 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2059 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2060 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002061 many = sorted("user.test{}".format(i) for i in range(100))
2062 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002063 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002064 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002065
Larry Hastings9cf065c2012-06-22 16:30:09 -07002066 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002067 def make_bytes(s):
2068 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07002069 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002070 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002071 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002072
2073 def test_simple(self):
2074 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2075 os.listxattr)
2076
2077 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002078 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2079 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002080
2081 def test_fds(self):
2082 def getxattr(path, *args):
2083 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002084 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002085 def setxattr(path, *args):
2086 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002087 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002088 def removexattr(path, *args):
2089 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002090 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002091 def listxattr(path, *args):
2092 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002093 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002094 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2095
2096
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002097@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2098class Win32DeprecatedBytesAPI(unittest.TestCase):
2099 def test_deprecated(self):
2100 import nt
2101 filename = os.fsencode(support.TESTFN)
2102 with warnings.catch_warnings():
2103 warnings.simplefilter("error", DeprecationWarning)
2104 for func, *args in (
2105 (nt._getfullpathname, filename),
2106 (nt._isdir, filename),
2107 (os.access, filename, os.R_OK),
2108 (os.chdir, filename),
2109 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01002110 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002111 (os.link, filename, filename),
2112 (os.listdir, filename),
2113 (os.lstat, filename),
2114 (os.mkdir, filename),
2115 (os.open, filename, os.O_RDONLY),
2116 (os.rename, filename, filename),
2117 (os.rmdir, filename),
2118 (os.startfile, filename),
2119 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002120 (os.unlink, filename),
2121 (os.utime, filename),
2122 ):
2123 self.assertRaises(DeprecationWarning, func, *args)
2124
Victor Stinner28216442011-11-16 00:34:44 +01002125 @support.skip_unless_symlink
2126 def test_symlink(self):
2127 filename = os.fsencode(support.TESTFN)
2128 with warnings.catch_warnings():
2129 warnings.simplefilter("error", DeprecationWarning)
2130 self.assertRaises(DeprecationWarning,
2131 os.symlink, filename, filename)
2132
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002133
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002134@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2135class TermsizeTests(unittest.TestCase):
2136 def test_does_not_crash(self):
2137 """Check if get_terminal_size() returns a meaningful value.
2138
2139 There's no easy portable way to actually check the size of the
2140 terminal, so let's check if it returns something sensible instead.
2141 """
2142 try:
2143 size = os.get_terminal_size()
2144 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002145 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002146 # Under win32 a generic OSError can be thrown if the
2147 # handle cannot be retrieved
2148 self.skipTest("failed to query terminal size")
2149 raise
2150
Antoine Pitroucfade362012-02-08 23:48:59 +01002151 self.assertGreaterEqual(size.columns, 0)
2152 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002153
2154 def test_stty_match(self):
2155 """Check if stty returns the same results
2156
2157 stty actually tests stdin, so get_terminal_size is invoked on
2158 stdin explicitly. If stty succeeded, then get_terminal_size()
2159 should work too.
2160 """
2161 try:
2162 size = subprocess.check_output(['stty', 'size']).decode().split()
2163 except (FileNotFoundError, subprocess.CalledProcessError):
2164 self.skipTest("stty invocation failed")
2165 expected = (int(size[1]), int(size[0])) # reversed order
2166
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002167 try:
2168 actual = os.get_terminal_size(sys.__stdin__.fileno())
2169 except OSError as e:
2170 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2171 # Under win32 a generic OSError can be thrown if the
2172 # handle cannot be retrieved
2173 self.skipTest("failed to query terminal size")
2174 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002175 self.assertEqual(expected, actual)
2176
2177
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002178@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002179def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002180 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002181 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002182 StatAttributeTests,
2183 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002184 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002185 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002186 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002187 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002188 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002189 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002190 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002191 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002192 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002193 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002194 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002195 Win32SymlinkTests,
Jason R. Coombs3a092862013-05-27 23:21:28 -04002196 NonLocalSymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002197 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002198 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002199 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002200 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002201 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002202 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002203 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002204 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002205 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002206 TermsizeTests,
Andrew Svetlov405faed2012-12-25 12:18:09 +02002207 RemoveDirsTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002208 )
Fred Drake2e2be372001-09-20 21:33:42 +00002209
2210if __name__ == "__main__":
2211 test_main()