blob: d47c8d35a060dfe7128018112a117735c5d75283 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000027try:
28 import threading
29except ImportError:
30 threading = None
Georg Brandl2daf6ae2012-02-20 19:54:16 +010031from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000032
Victor Stinner034d0aa2012-06-05 01:22:15 +020033with warnings.catch_warnings():
34 warnings.simplefilter("ignore", DeprecationWarning)
35 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010036st = os.stat(__file__)
37stat_supports_subsecond = (
38 # check if float and int timestamps are different
39 (st.st_atime != st[7])
40 or (st.st_mtime != st[8])
41 or (st.st_ctime != st[9]))
42
Mark Dickinson7cf03892010-04-16 13:45:35 +000043# Detect whether we're on a Linux system that uses the (now outdated
44# and unmaintained) linuxthreads threading library. There's an issue
45# when combining linuxthreads with a failed execv call: see
46# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020047if hasattr(sys, 'thread_info') and sys.thread_info.version:
48 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
49else:
50 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000051
Thomas Wouters0e3f5912006-08-11 14:57:12 +000052# Tests creating TESTFN
53class FileTests(unittest.TestCase):
54 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000055 if os.path.exists(support.TESTFN):
56 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000057 tearDown = setUp
58
59 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000060 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000061 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000062 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000063
Christian Heimesfdab48e2008-01-20 09:06:41 +000064 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000065 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
66 # We must allocate two consecutive file descriptors, otherwise
67 # it will mess up other file descriptors (perhaps even the three
68 # standard ones).
69 second = os.dup(first)
70 try:
71 retries = 0
72 while second != first + 1:
73 os.close(first)
74 retries += 1
75 if retries > 10:
76 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000077 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000078 first, second = second, os.dup(second)
79 finally:
80 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000081 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000082 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000083 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000084
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000085 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000086 def test_rename(self):
87 path = support.TESTFN
88 old = sys.getrefcount(path)
89 self.assertRaises(TypeError, os.rename, path, 0)
90 new = sys.getrefcount(path)
91 self.assertEqual(old, new)
92
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000093 def test_read(self):
94 with open(support.TESTFN, "w+b") as fobj:
95 fobj.write(b"spam")
96 fobj.flush()
97 fd = fobj.fileno()
98 os.lseek(fd, 0, 0)
99 s = os.read(fd, 4)
100 self.assertEqual(type(s), bytes)
101 self.assertEqual(s, b"spam")
102
103 def test_write(self):
104 # os.write() accepts bytes- and buffer-like objects but not strings
105 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
106 self.assertRaises(TypeError, os.write, fd, "beans")
107 os.write(fd, b"bacon\n")
108 os.write(fd, bytearray(b"eggs\n"))
109 os.write(fd, memoryview(b"spam\n"))
110 os.close(fd)
111 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000112 self.assertEqual(fobj.read().splitlines(),
113 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000114
Victor Stinnere0daff12011-03-20 23:36:35 +0100115 def write_windows_console(self, *args):
116 retcode = subprocess.call(args,
117 # use a new console to not flood the test output
118 creationflags=subprocess.CREATE_NEW_CONSOLE,
119 # use a shell to hide the console window (SW_HIDE)
120 shell=True)
121 self.assertEqual(retcode, 0)
122
123 @unittest.skipUnless(sys.platform == 'win32',
124 'test specific to the Windows console')
125 def test_write_windows_console(self):
126 # Issue #11395: the Windows console returns an error (12: not enough
127 # space error) on writing into stdout if stdout mode is binary and the
128 # length is greater than 66,000 bytes (or less, depending on heap
129 # usage).
130 code = "print('x' * 100000)"
131 self.write_windows_console(sys.executable, "-c", code)
132 self.write_windows_console(sys.executable, "-u", "-c", code)
133
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000134 def fdopen_helper(self, *args):
135 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200136 f = os.fdopen(fd, *args)
137 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000138
139 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200140 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
141 os.close(fd)
142
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000143 self.fdopen_helper()
144 self.fdopen_helper('r')
145 self.fdopen_helper('r', 100)
146
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100147 def test_replace(self):
148 TESTFN2 = support.TESTFN + ".2"
149 with open(support.TESTFN, 'w') as f:
150 f.write("1")
151 with open(TESTFN2, 'w') as f:
152 f.write("2")
153 self.addCleanup(os.unlink, TESTFN2)
154 os.replace(support.TESTFN, TESTFN2)
155 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
156 with open(TESTFN2, 'r') as f:
157 self.assertEqual(f.read(), "1")
158
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200159
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000160# Test attributes on return values from os.*stat* family.
161class StatAttributeTests(unittest.TestCase):
162 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000163 os.mkdir(support.TESTFN)
164 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000166 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000168
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169 def tearDown(self):
170 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000171 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000172
Antoine Pitrou38425292010-09-21 18:19:07 +0000173 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000174 if not hasattr(os, "stat"):
175 return
176
Antoine Pitrou38425292010-09-21 18:19:07 +0000177 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000178
179 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000180 self.assertEqual(result[stat.ST_SIZE], 3)
181 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000182
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000183 # Make sure all the attributes are there
184 members = dir(result)
185 for name in dir(stat):
186 if name[:3] == 'ST_':
187 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000188 if name.endswith("TIME"):
189 def trunc(x): return int(x)
190 else:
191 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000192 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000193 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000194 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000195
Larry Hastings6fe20b32012-04-19 15:07:49 -0700196 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700197 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700198 for name in 'st_atime st_mtime st_ctime'.split():
199 floaty = int(getattr(result, name) * 100000)
200 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700201 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700202
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203 try:
204 result[200]
205 self.fail("No exception thrown")
206 except IndexError:
207 pass
208
209 # Make sure that assignment fails
210 try:
211 result.st_mode = 1
212 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000213 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000214 pass
215
216 try:
217 result.st_rdev = 1
218 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000219 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 pass
221
222 try:
223 result.parrot = 1
224 self.fail("No exception thrown")
225 except AttributeError:
226 pass
227
228 # Use the stat_result constructor with a too-short tuple.
229 try:
230 result2 = os.stat_result((10,))
231 self.fail("No exception thrown")
232 except TypeError:
233 pass
234
Ezio Melotti42da6632011-03-15 05:18:48 +0200235 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000236 try:
237 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
238 except TypeError:
239 pass
240
Antoine Pitrou38425292010-09-21 18:19:07 +0000241 def test_stat_attributes(self):
242 self.check_stat_attributes(self.fname)
243
244 def test_stat_attributes_bytes(self):
245 try:
246 fname = self.fname.encode(sys.getfilesystemencoding())
247 except UnicodeEncodeError:
248 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100249 with warnings.catch_warnings():
250 warnings.simplefilter("ignore", DeprecationWarning)
251 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000252
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000253 def test_statvfs_attributes(self):
254 if not hasattr(os, "statvfs"):
255 return
256
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000257 try:
258 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000259 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000260 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000261 if e.errno == errno.ENOSYS:
262 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
264 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000266
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000267 # Make sure all the attributes are there.
268 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
269 'ffree', 'favail', 'flag', 'namemax')
270 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000271 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272
273 # Make sure that assignment really fails
274 try:
275 result.f_bfree = 1
276 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000277 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278 pass
279
280 try:
281 result.parrot = 1
282 self.fail("No exception thrown")
283 except AttributeError:
284 pass
285
286 # Use the constructor with a too-short tuple.
287 try:
288 result2 = os.statvfs_result((10,))
289 self.fail("No exception thrown")
290 except TypeError:
291 pass
292
Ezio Melotti42da6632011-03-15 05:18:48 +0200293 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000294 try:
295 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
296 except TypeError:
297 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000298
Thomas Wouters89f507f2006-12-13 04:49:30 +0000299 def test_utime_dir(self):
300 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000301 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000302 # round to int, because some systems may support sub-second
303 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000304 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
305 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000306 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000307
Larry Hastings76ad59b2012-05-03 00:30:07 -0700308 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600309 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600310 # second argument. Check that the previous methods of passing
311 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700312 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600313 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700314 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
315 # Setting the time to the time you just read, then reading again,
316 # should always return exactly the same times.
317 st1 = os.stat(filename)
318 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
319 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600320 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700321 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600322 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700323 # Set to the current time in the new way
324 os.utime(filename)
325 st3 = os.stat(filename)
326 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
327
328 def test_utime(self):
329 def utime(file, times):
330 return os.utime(file, times)
331 self._test_utime(self.fname, getattr, utime, 10)
332 self._test_utime(support.TESTFN, getattr, utime, 10)
333
334
335 def _test_utime_ns(self, set_times_ns, test_dir=True):
336 def getattr_ns(o, attr):
337 return getattr(o, attr + "_ns")
338 ten_s = 10 * 1000 * 1000 * 1000
339 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
340 if test_dir:
341 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
342
343 def test_utime_ns(self):
344 def utime_ns(file, times):
345 return os.utime(file, ns=times)
346 self._test_utime_ns(utime_ns)
347
348 requires_lutimes = unittest.skipUnless(hasattr(os, 'lutimes'),
349 "os.lutimes required for this test.")
350 requires_futimes = unittest.skipUnless(hasattr(os, 'futimes'),
351 "os.futimes required for this test.")
352
353 @requires_lutimes
354 def test_lutimes_ns(self):
355 def lutimes_ns(file, times):
356 return os.lutimes(file, ns=times)
357 self._test_utime_ns(lutimes_ns)
358
359 @requires_futimes
360 def test_futimes_ns(self):
361 def futimes_ns(file, times):
362 with open(file, "wb") as f:
363 os.futimes(f.fileno(), ns=times)
364 self._test_utime_ns(futimes_ns, test_dir=False)
365
366 def _utime_invalid_arguments(self, name, arg):
367 with self.assertRaises(RuntimeError):
368 getattr(os, name)(arg, (5, 5), ns=(5, 5))
369
370 def test_utime_invalid_arguments(self):
371 self._utime_invalid_arguments('utime', self.fname)
372
373 @requires_lutimes
374 def test_lutimes_invalid_arguments(self):
375 self._utime_invalid_arguments('lutimes', self.fname)
376
377 @requires_futimes
378 def test_futimes_invalid_arguments(self):
379 with open(self.fname, "wb") as f:
380 self._utime_invalid_arguments('futimes', f.fileno())
381
Brian Curtin52fbea12011-11-06 13:41:17 -0600382
Victor Stinner1aa54a42012-02-08 04:09:37 +0100383 @unittest.skipUnless(stat_supports_subsecond,
384 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100385 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100386 asec, amsec = 1, 901
387 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100388 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100389 mtime = msec + mmsec * 1e-3
390 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100391 os.utime(filename, (0, 0))
392 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200393 with warnings.catch_warnings():
394 warnings.simplefilter("ignore", DeprecationWarning)
395 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100396 st = os.stat(filename)
397 self.assertAlmostEqual(st.st_atime, atime, places=3)
398 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100399
Victor Stinnera2f7c002012-02-08 03:36:25 +0100400 def test_utime_subsecond(self):
401 def set_time(filename, atime, mtime):
402 os.utime(filename, (atime, mtime))
403 self._test_utime_subsecond(set_time)
404
Larry Hastings76ad59b2012-05-03 00:30:07 -0700405 @requires_futimes
Victor Stinnera2f7c002012-02-08 03:36:25 +0100406 def test_futimes_subsecond(self):
407 def set_time(filename, atime, mtime):
408 with open(filename, "wb") as f:
409 os.futimes(f.fileno(), (atime, mtime))
410 self._test_utime_subsecond(set_time)
411
412 @unittest.skipUnless(hasattr(os, 'futimens'),
413 "os.futimens required for this test.")
414 def test_futimens_subsecond(self):
415 def set_time(filename, atime, mtime):
416 with open(filename, "wb") as f:
417 asec, ansec = divmod(atime, 1.0)
418 asec = int(asec)
419 ansec = int(ansec * 1e9)
420 msec, mnsec = divmod(mtime, 1.0)
421 msec = int(msec)
422 mnsec = int(mnsec * 1e9)
423 os.futimens(f.fileno(),
424 (asec, ansec),
425 (msec, mnsec))
426 self._test_utime_subsecond(set_time)
427
428 @unittest.skipUnless(hasattr(os, 'futimesat'),
429 "os.futimesat required for this test.")
430 def test_futimesat_subsecond(self):
431 def set_time(filename, atime, mtime):
432 dirname = os.path.dirname(filename)
433 dirfd = os.open(dirname, os.O_RDONLY)
434 try:
435 os.futimesat(dirfd, os.path.basename(filename),
436 (atime, mtime))
437 finally:
438 os.close(dirfd)
439 self._test_utime_subsecond(set_time)
440
Larry Hastings76ad59b2012-05-03 00:30:07 -0700441 @requires_lutimes
Victor Stinnera2f7c002012-02-08 03:36:25 +0100442 def test_lutimes_subsecond(self):
443 def set_time(filename, atime, mtime):
444 os.lutimes(filename, (atime, mtime))
445 self._test_utime_subsecond(set_time)
446
447 @unittest.skipUnless(hasattr(os, 'utimensat'),
448 "os.utimensat required for this test.")
449 def test_utimensat_subsecond(self):
450 def set_time(filename, atime, mtime):
451 dirname = os.path.dirname(filename)
452 dirfd = os.open(dirname, os.O_RDONLY)
453 try:
454 asec, ansec = divmod(atime, 1.0)
455 asec = int(asec)
456 ansec = int(ansec * 1e9)
457 msec, mnsec = divmod(mtime, 1.0)
458 msec = int(msec)
459 mnsec = int(mnsec * 1e9)
460 os.utimensat(dirfd, os.path.basename(filename),
461 (asec, ansec),
462 (msec, mnsec))
463 finally:
464 os.close(dirfd)
465 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100466
Thomas Wouters89f507f2006-12-13 04:49:30 +0000467 # Restrict test to Win32, since there is no guarantee other
468 # systems support centiseconds
469 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000470 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000471 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000472 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000473 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000474 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000475 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000476 return buf.value
477
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000478 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000479 def test_1565150(self):
480 t1 = 1159195039.25
481 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000482 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000483
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000484 def test_large_time(self):
485 t1 = 5000000000 # some day in 2128
486 os.utime(self.fname, (t1, t1))
487 self.assertEqual(os.stat(self.fname).st_mtime, t1)
488
Guido van Rossumd8faa362007-04-27 19:54:29 +0000489 def test_1686475(self):
490 # Verify that an open file can be stat'ed
491 try:
492 os.stat(r"c:\pagefile.sys")
493 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000494 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000495 return
496 self.fail("Could not stat pagefile.sys")
497
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000498from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000499
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000500class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000501 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000502 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000503
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000504 def setUp(self):
505 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000506 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000507 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000508 for key, value in self._reference().items():
509 os.environ[key] = value
510
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000511 def tearDown(self):
512 os.environ.clear()
513 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000514 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000515 os.environb.clear()
516 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000517
Christian Heimes90333392007-11-01 19:08:42 +0000518 def _reference(self):
519 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
520
521 def _empty_mapping(self):
522 os.environ.clear()
523 return os.environ
524
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000525 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000526 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000527 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000528 if os.path.exists("/bin/sh"):
529 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000530 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
531 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000532 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000533
Christian Heimes1a13d592007-11-08 14:16:55 +0000534 def test_os_popen_iter(self):
535 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000536 with os.popen(
537 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
538 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000539 self.assertEqual(next(it), "line1\n")
540 self.assertEqual(next(it), "line2\n")
541 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000542 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000543
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000544 # Verify environ keys and values from the OS are of the
545 # correct str type.
546 def test_keyvalue_types(self):
547 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000548 self.assertEqual(type(key), str)
549 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000550
Christian Heimes90333392007-11-01 19:08:42 +0000551 def test_items(self):
552 for key, value in self._reference().items():
553 self.assertEqual(os.environ.get(key), value)
554
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000555 # Issue 7310
556 def test___repr__(self):
557 """Check that the repr() of os.environ looks like environ({...})."""
558 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000559 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
560 '{!r}: {!r}'.format(key, value)
561 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000562
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000563 def test_get_exec_path(self):
564 defpath_list = os.defpath.split(os.pathsep)
565 test_path = ['/monty', '/python', '', '/flying/circus']
566 test_env = {'PATH': os.pathsep.join(test_path)}
567
568 saved_environ = os.environ
569 try:
570 os.environ = dict(test_env)
571 # Test that defaulting to os.environ works.
572 self.assertSequenceEqual(test_path, os.get_exec_path())
573 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
574 finally:
575 os.environ = saved_environ
576
577 # No PATH environment variable
578 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
579 # Empty PATH environment variable
580 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
581 # Supplied PATH environment variable
582 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
583
Victor Stinnerb745a742010-05-18 17:17:23 +0000584 if os.supports_bytes_environ:
585 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000586 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000587 # ignore BytesWarning warning
588 with warnings.catch_warnings(record=True):
589 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000590 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000591 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000592 pass
593 else:
594 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000595
596 # bytes key and/or value
597 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
598 ['abc'])
599 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
600 ['abc'])
601 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
602 ['abc'])
603
604 @unittest.skipUnless(os.supports_bytes_environ,
605 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000606 def test_environb(self):
607 # os.environ -> os.environb
608 value = 'euro\u20ac'
609 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000610 value_bytes = value.encode(sys.getfilesystemencoding(),
611 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000612 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000613 msg = "U+20AC character is not encodable to %s" % (
614 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000615 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000616 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000617 self.assertEqual(os.environ['unicode'], value)
618 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000619
620 # os.environb -> os.environ
621 value = b'\xff'
622 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000623 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000624 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000625 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000626
Charles-François Natali2966f102011-11-26 11:32:46 +0100627 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
628 # #13415).
629 @support.requires_freebsd_version(7)
630 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100631 def test_unset_error(self):
632 if sys.platform == "win32":
633 # an environment variable is limited to 32,767 characters
634 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100635 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100636 else:
637 # "=" is not allowed in a variable name
638 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100639 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100640
Tim Petersc4e09402003-04-25 07:11:48 +0000641class WalkTests(unittest.TestCase):
642 """Tests for os.walk()."""
643
Charles-François Natali7372b062012-02-05 15:15:38 +0100644 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000645 import os
646 from os.path import join
647
648 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000649 # TESTFN/
650 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000651 # tmp1
652 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000653 # tmp2
654 # SUB11/ no kids
655 # SUB2/ a file kid and a dirsymlink kid
656 # tmp3
657 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200658 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000659 # TEST2/
660 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000661 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000662 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000663 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000664 sub2_path = join(walk_path, "SUB2")
665 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000666 tmp2_path = join(sub1_path, "tmp2")
667 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000668 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000669 t2_path = join(support.TESTFN, "TEST2")
670 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200671 link_path = join(sub2_path, "link")
672 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000673
674 # Create stuff.
675 os.makedirs(sub11_path)
676 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000677 os.makedirs(t2_path)
678 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000679 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000680 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
681 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000682 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100683 if os.name == 'nt':
684 def symlink_to_dir(src, dest):
685 os.symlink(src, dest, True)
686 else:
687 symlink_to_dir = os.symlink
688 symlink_to_dir(os.path.abspath(t2_path), link_path)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200689 symlink_to_dir('broken', broken_link_path)
690 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000691 else:
692 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000693
694 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000695 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000696 self.assertEqual(len(all), 4)
697 # We can't know which order SUB1 and SUB2 will appear in.
698 # Not flipped: TESTFN, SUB1, SUB11, SUB2
699 # flipped: TESTFN, SUB2, SUB1, SUB11
700 flipped = all[0][1][0] != "SUB1"
701 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200702 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000703 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000704 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
705 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000706 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000707
708 # Prune the search.
709 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000710 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000711 all.append((root, dirs, files))
712 # Don't descend into SUB1.
713 if 'SUB1' in dirs:
714 # Note that this also mutates the dirs we appended to all!
715 dirs.remove('SUB1')
716 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000717 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200718 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000719 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000720
721 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000722 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000723 self.assertEqual(len(all), 4)
724 # We can't know which order SUB1 and SUB2 will appear in.
725 # Not flipped: SUB11, SUB1, SUB2, TESTFN
726 # flipped: SUB2, SUB11, SUB1, TESTFN
727 flipped = all[3][1][0] != "SUB1"
728 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200729 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000730 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000731 self.assertEqual(all[flipped], (sub11_path, [], []))
732 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000733 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000734
Brian Curtin3b4499c2010-12-28 14:31:47 +0000735 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000736 # Walk, following symlinks.
737 for root, dirs, files in os.walk(walk_path, followlinks=True):
738 if root == link_path:
739 self.assertEqual(dirs, [])
740 self.assertEqual(files, ["tmp4"])
741 break
742 else:
743 self.fail("Didn't follow symlink with followlinks=True")
744
745 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000746 # Tear everything down. This is a decent use for bottom-up on
747 # Windows, which doesn't have a recursive delete command. The
748 # (not so) subtlety is that rmdir will fail unless the dir's
749 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000750 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000751 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000752 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000753 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000754 dirname = os.path.join(root, name)
755 if not os.path.islink(dirname):
756 os.rmdir(dirname)
757 else:
758 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000759 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000760
Charles-François Natali7372b062012-02-05 15:15:38 +0100761
762@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
763class FwalkTests(WalkTests):
764 """Tests for os.fwalk()."""
765
766 def test_compare_to_walk(self):
767 # compare with walk() results
768 for topdown, followlinks in itertools.product((True, False), repeat=2):
769 args = support.TESTFN, topdown, None, followlinks
770 expected = {}
771 for root, dirs, files in os.walk(*args):
772 expected[root] = (set(dirs), set(files))
773
774 for root, dirs, files, rootfd in os.fwalk(*args):
775 self.assertIn(root, expected)
776 self.assertEqual(expected[root], (set(dirs), set(files)))
777
778 def test_dir_fd(self):
779 # check returned file descriptors
780 for topdown, followlinks in itertools.product((True, False), repeat=2):
781 args = support.TESTFN, topdown, None, followlinks
782 for root, dirs, files, rootfd in os.fwalk(*args):
783 # check that the FD is valid
784 os.fstat(rootfd)
Charles-François Natali77940902012-02-06 19:54:48 +0100785 # check that flistdir() returns consistent information
786 self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100787
788 def test_fd_leak(self):
789 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
790 # we both check that calling fwalk() a large number of times doesn't
791 # yield EMFILE, and that the minimum allocated FD hasn't changed.
792 minfd = os.dup(1)
793 os.close(minfd)
794 for i in range(256):
795 for x in os.fwalk(support.TESTFN):
796 pass
797 newfd = os.dup(1)
798 self.addCleanup(os.close, newfd)
799 self.assertEqual(newfd, minfd)
800
801 def tearDown(self):
802 # cleanup
803 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
804 for name in files:
805 os.unlinkat(rootfd, name)
806 for name in dirs:
807 st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW)
808 if stat.S_ISDIR(st.st_mode):
809 os.unlinkat(rootfd, name, os.AT_REMOVEDIR)
810 else:
811 os.unlinkat(rootfd, name)
812 os.rmdir(support.TESTFN)
813
814
Guido van Rossume7ba4952007-06-06 23:52:48 +0000815class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000816 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000817 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000818
819 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000820 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000821 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
822 os.makedirs(path) # Should work
823 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
824 os.makedirs(path)
825
826 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000827 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000828 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
829 os.makedirs(path)
830 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
831 'dir5', 'dir6')
832 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000833
Terry Reedy5a22b652010-12-02 07:05:56 +0000834 def test_exist_ok_existing_directory(self):
835 path = os.path.join(support.TESTFN, 'dir1')
836 mode = 0o777
837 old_mask = os.umask(0o022)
838 os.makedirs(path, mode)
839 self.assertRaises(OSError, os.makedirs, path, mode)
840 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
841 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
842 os.makedirs(path, mode=mode, exist_ok=True)
843 os.umask(old_mask)
844
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700845 def test_exist_ok_s_isgid_directory(self):
846 path = os.path.join(support.TESTFN, 'dir1')
847 S_ISGID = stat.S_ISGID
848 mode = 0o777
849 old_mask = os.umask(0o022)
850 try:
851 existing_testfn_mode = stat.S_IMODE(
852 os.lstat(support.TESTFN).st_mode)
853 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
854 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
855 raise unittest.SkipTest('No support for S_ISGID dir mode.')
856 # The os should apply S_ISGID from the parent dir for us, but
857 # this test need not depend on that behavior. Be explicit.
858 os.makedirs(path, mode | S_ISGID)
859 # http://bugs.python.org/issue14992
860 # Should not fail when the bit is already set.
861 os.makedirs(path, mode, exist_ok=True)
862 # remove the bit.
863 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
864 with self.assertRaises(OSError):
865 # Should fail when the bit is not already set when demanded.
866 os.makedirs(path, mode | S_ISGID, exist_ok=True)
867 finally:
868 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000869
870 def test_exist_ok_existing_regular_file(self):
871 base = support.TESTFN
872 path = os.path.join(support.TESTFN, 'dir1')
873 f = open(path, 'w')
874 f.write('abc')
875 f.close()
876 self.assertRaises(OSError, os.makedirs, path)
877 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
878 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
879 os.remove(path)
880
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000881 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000882 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000883 'dir4', 'dir5', 'dir6')
884 # If the tests failed, the bottom-most directory ('../dir6')
885 # may not have been created, so we look for the outermost directory
886 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000887 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000888 path = os.path.dirname(path)
889
890 os.removedirs(path)
891
Guido van Rossume7ba4952007-06-06 23:52:48 +0000892class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000893 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200894 with open(os.devnull, 'wb') as f:
895 f.write(b'hello')
896 f.close()
897 with open(os.devnull, 'rb') as f:
898 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000899
Guido van Rossume7ba4952007-06-06 23:52:48 +0000900class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100901 def test_urandom_length(self):
902 self.assertEqual(len(os.urandom(0)), 0)
903 self.assertEqual(len(os.urandom(1)), 1)
904 self.assertEqual(len(os.urandom(10)), 10)
905 self.assertEqual(len(os.urandom(100)), 100)
906 self.assertEqual(len(os.urandom(1000)), 1000)
907
908 def test_urandom_value(self):
909 data1 = os.urandom(16)
910 data2 = os.urandom(16)
911 self.assertNotEqual(data1, data2)
912
913 def get_urandom_subprocess(self, count):
914 code = '\n'.join((
915 'import os, sys',
916 'data = os.urandom(%s)' % count,
917 'sys.stdout.buffer.write(data)',
918 'sys.stdout.buffer.flush()'))
919 out = assert_python_ok('-c', code)
920 stdout = out[1]
921 self.assertEqual(len(stdout), 16)
922 return stdout
923
924 def test_urandom_subprocess(self):
925 data1 = self.get_urandom_subprocess(16)
926 data2 = self.get_urandom_subprocess(16)
927 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000928
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000929@contextlib.contextmanager
930def _execvpe_mockup(defpath=None):
931 """
932 Stubs out execv and execve functions when used as context manager.
933 Records exec calls. The mock execv and execve functions always raise an
934 exception as they would normally never return.
935 """
936 # A list of tuples containing (function name, first arg, args)
937 # of calls to execv or execve that have been made.
938 calls = []
939
940 def mock_execv(name, *args):
941 calls.append(('execv', name, args))
942 raise RuntimeError("execv called")
943
944 def mock_execve(name, *args):
945 calls.append(('execve', name, args))
946 raise OSError(errno.ENOTDIR, "execve called")
947
948 try:
949 orig_execv = os.execv
950 orig_execve = os.execve
951 orig_defpath = os.defpath
952 os.execv = mock_execv
953 os.execve = mock_execve
954 if defpath is not None:
955 os.defpath = defpath
956 yield calls
957 finally:
958 os.execv = orig_execv
959 os.execve = orig_execve
960 os.defpath = orig_defpath
961
Guido van Rossume7ba4952007-06-06 23:52:48 +0000962class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000963 @unittest.skipIf(USING_LINUXTHREADS,
964 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000965 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000966 self.assertRaises(OSError, os.execvpe, 'no such app-',
967 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000968
Thomas Heller6790d602007-08-30 17:15:14 +0000969 def test_execvpe_with_bad_arglist(self):
970 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
971
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000972 @unittest.skipUnless(hasattr(os, '_execvpe'),
973 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000974 def _test_internal_execvpe(self, test_type):
975 program_path = os.sep + 'absolutepath'
976 if test_type is bytes:
977 program = b'executable'
978 fullpath = os.path.join(os.fsencode(program_path), program)
979 native_fullpath = fullpath
980 arguments = [b'progname', 'arg1', 'arg2']
981 else:
982 program = 'executable'
983 arguments = ['progname', 'arg1', 'arg2']
984 fullpath = os.path.join(program_path, program)
985 if os.name != "nt":
986 native_fullpath = os.fsencode(fullpath)
987 else:
988 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000989 env = {'spam': 'beans'}
990
Victor Stinnerb745a742010-05-18 17:17:23 +0000991 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000992 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000993 self.assertRaises(RuntimeError,
994 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000995 self.assertEqual(len(calls), 1)
996 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
997
Victor Stinnerb745a742010-05-18 17:17:23 +0000998 # test os._execvpe() with a relative path:
999 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001000 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001001 self.assertRaises(OSError,
1002 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001003 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001004 self.assertSequenceEqual(calls[0],
1005 ('execve', native_fullpath, (arguments, env)))
1006
1007 # test os._execvpe() with a relative path:
1008 # os.get_exec_path() reads the 'PATH' variable
1009 with _execvpe_mockup() as calls:
1010 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001011 if test_type is bytes:
1012 env_path[b'PATH'] = program_path
1013 else:
1014 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001015 self.assertRaises(OSError,
1016 os._execvpe, program, arguments, env=env_path)
1017 self.assertEqual(len(calls), 1)
1018 self.assertSequenceEqual(calls[0],
1019 ('execve', native_fullpath, (arguments, env_path)))
1020
1021 def test_internal_execvpe_str(self):
1022 self._test_internal_execvpe(str)
1023 if os.name != "nt":
1024 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001025
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001026
Thomas Wouters477c8d52006-05-27 19:21:47 +00001027class Win32ErrorTests(unittest.TestCase):
1028 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001029 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001030
1031 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001032 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001033
1034 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001035 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001036
1037 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001038 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001039 try:
1040 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
1041 finally:
1042 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001043 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001044
1045 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001046 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001047
Thomas Wouters477c8d52006-05-27 19:21:47 +00001048 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +00001049 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001050
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001051class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001052 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001053 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1054 #singles.append("close")
1055 #We omit close because it doesn'r raise an exception on some platforms
1056 def get_single(f):
1057 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001058 if hasattr(os, f):
1059 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001060 return helper
1061 for f in singles:
1062 locals()["test_"+f] = get_single(f)
1063
Benjamin Peterson7522c742009-01-19 21:00:09 +00001064 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001065 try:
1066 f(support.make_bad_fd(), *args)
1067 except OSError as e:
1068 self.assertEqual(e.errno, errno.EBADF)
1069 else:
1070 self.fail("%r didn't raise a OSError with a bad file descriptor"
1071 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001072
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001073 def test_isatty(self):
1074 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001075 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001076
1077 def test_closerange(self):
1078 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001079 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +00001080 # Make sure none of the descriptors we are about to close are
1081 # currently valid (issue 6542).
1082 for i in range(10):
1083 try: os.fstat(fd+i)
1084 except OSError:
1085 pass
1086 else:
1087 break
1088 if i < 2:
1089 raise unittest.SkipTest(
1090 "Unable to acquire a range of invalid file descriptors")
1091 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001092
1093 def test_dup2(self):
1094 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001095 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001096
1097 def test_fchmod(self):
1098 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001099 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001100
1101 def test_fchown(self):
1102 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001103 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001104
1105 def test_fpathconf(self):
1106 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001107 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001108
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001109 def test_ftruncate(self):
1110 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001111 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001112
1113 def test_lseek(self):
1114 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001115 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001116
1117 def test_read(self):
1118 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001119 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001120
1121 def test_tcsetpgrpt(self):
1122 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001123 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001124
1125 def test_write(self):
1126 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001127 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001128
Brian Curtin1b9df392010-11-24 20:24:31 +00001129
1130class LinkTests(unittest.TestCase):
1131 def setUp(self):
1132 self.file1 = support.TESTFN
1133 self.file2 = os.path.join(support.TESTFN + "2")
1134
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001135 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001136 for file in (self.file1, self.file2):
1137 if os.path.exists(file):
1138 os.unlink(file)
1139
Brian Curtin1b9df392010-11-24 20:24:31 +00001140 def _test_link(self, file1, file2):
1141 with open(file1, "w") as f1:
1142 f1.write("test")
1143
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001144 with warnings.catch_warnings():
1145 warnings.simplefilter("ignore", DeprecationWarning)
1146 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001147 with open(file1, "r") as f1, open(file2, "r") as f2:
1148 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1149
1150 def test_link(self):
1151 self._test_link(self.file1, self.file2)
1152
1153 def test_link_bytes(self):
1154 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1155 bytes(self.file2, sys.getfilesystemencoding()))
1156
Brian Curtinf498b752010-11-30 15:54:04 +00001157 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001158 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001159 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001160 except UnicodeError:
1161 raise unittest.SkipTest("Unable to encode for this platform.")
1162
Brian Curtinf498b752010-11-30 15:54:04 +00001163 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001164 self.file2 = self.file1 + "2"
1165 self._test_link(self.file1, self.file2)
1166
Thomas Wouters477c8d52006-05-27 19:21:47 +00001167if sys.platform != 'win32':
1168 class Win32ErrorTests(unittest.TestCase):
1169 pass
1170
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001171 class PosixUidGidTests(unittest.TestCase):
1172 if hasattr(os, 'setuid'):
1173 def test_setuid(self):
1174 if os.getuid() != 0:
1175 self.assertRaises(os.error, os.setuid, 0)
1176 self.assertRaises(OverflowError, os.setuid, 1<<32)
1177
1178 if hasattr(os, 'setgid'):
1179 def test_setgid(self):
1180 if os.getuid() != 0:
1181 self.assertRaises(os.error, os.setgid, 0)
1182 self.assertRaises(OverflowError, os.setgid, 1<<32)
1183
1184 if hasattr(os, 'seteuid'):
1185 def test_seteuid(self):
1186 if os.getuid() != 0:
1187 self.assertRaises(os.error, os.seteuid, 0)
1188 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1189
1190 if hasattr(os, 'setegid'):
1191 def test_setegid(self):
1192 if os.getuid() != 0:
1193 self.assertRaises(os.error, os.setegid, 0)
1194 self.assertRaises(OverflowError, os.setegid, 1<<32)
1195
1196 if hasattr(os, 'setreuid'):
1197 def test_setreuid(self):
1198 if os.getuid() != 0:
1199 self.assertRaises(os.error, os.setreuid, 0, 0)
1200 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1201 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001202
1203 def test_setreuid_neg1(self):
1204 # Needs to accept -1. We run this in a subprocess to avoid
1205 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001206 subprocess.check_call([
1207 sys.executable, '-c',
1208 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001209
1210 if hasattr(os, 'setregid'):
1211 def test_setregid(self):
1212 if os.getuid() != 0:
1213 self.assertRaises(os.error, os.setregid, 0, 0)
1214 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1215 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001216
1217 def test_setregid_neg1(self):
1218 # Needs to accept -1. We run this in a subprocess to avoid
1219 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001220 subprocess.check_call([
1221 sys.executable, '-c',
1222 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001223
1224 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001225 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001226 if support.TESTFN_UNENCODABLE:
1227 self.dir = support.TESTFN_UNENCODABLE
1228 else:
1229 self.dir = support.TESTFN
1230 self.bdir = os.fsencode(self.dir)
1231
1232 bytesfn = []
1233 def add_filename(fn):
1234 try:
1235 fn = os.fsencode(fn)
1236 except UnicodeEncodeError:
1237 return
1238 bytesfn.append(fn)
1239 add_filename(support.TESTFN_UNICODE)
1240 if support.TESTFN_UNENCODABLE:
1241 add_filename(support.TESTFN_UNENCODABLE)
1242 if not bytesfn:
1243 self.skipTest("couldn't create any non-ascii filename")
1244
1245 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001246 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001247 try:
1248 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001249 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001250 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001251 if fn in self.unicodefn:
1252 raise ValueError("duplicate filename")
1253 self.unicodefn.add(fn)
1254 except:
1255 shutil.rmtree(self.dir)
1256 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001257
1258 def tearDown(self):
1259 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001260
1261 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001262 expected = self.unicodefn
1263 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001264 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001265
1266 def test_open(self):
1267 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001268 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001269 f.close()
1270
1271 def test_stat(self):
1272 for fn in self.unicodefn:
1273 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001274else:
1275 class PosixUidGidTests(unittest.TestCase):
1276 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001277 class Pep383Tests(unittest.TestCase):
1278 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001279
Brian Curtineb24d742010-04-12 17:16:38 +00001280@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1281class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001282 def _kill(self, sig):
1283 # Start sys.executable as a subprocess and communicate from the
1284 # subprocess to the parent that the interpreter is ready. When it
1285 # becomes ready, send *sig* via os.kill to the subprocess and check
1286 # that the return code is equal to *sig*.
1287 import ctypes
1288 from ctypes import wintypes
1289 import msvcrt
1290
1291 # Since we can't access the contents of the process' stdout until the
1292 # process has exited, use PeekNamedPipe to see what's inside stdout
1293 # without waiting. This is done so we can tell that the interpreter
1294 # is started and running at a point where it could handle a signal.
1295 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1296 PeekNamedPipe.restype = wintypes.BOOL
1297 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1298 ctypes.POINTER(ctypes.c_char), # stdout buf
1299 wintypes.DWORD, # Buffer size
1300 ctypes.POINTER(wintypes.DWORD), # bytes read
1301 ctypes.POINTER(wintypes.DWORD), # bytes avail
1302 ctypes.POINTER(wintypes.DWORD)) # bytes left
1303 msg = "running"
1304 proc = subprocess.Popen([sys.executable, "-c",
1305 "import sys;"
1306 "sys.stdout.write('{}');"
1307 "sys.stdout.flush();"
1308 "input()".format(msg)],
1309 stdout=subprocess.PIPE,
1310 stderr=subprocess.PIPE,
1311 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001312 self.addCleanup(proc.stdout.close)
1313 self.addCleanup(proc.stderr.close)
1314 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001315
1316 count, max = 0, 100
1317 while count < max and proc.poll() is None:
1318 # Create a string buffer to store the result of stdout from the pipe
1319 buf = ctypes.create_string_buffer(len(msg))
1320 # Obtain the text currently in proc.stdout
1321 # Bytes read/avail/left are left as NULL and unused
1322 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1323 buf, ctypes.sizeof(buf), None, None, None)
1324 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1325 if buf.value:
1326 self.assertEqual(msg, buf.value.decode())
1327 break
1328 time.sleep(0.1)
1329 count += 1
1330 else:
1331 self.fail("Did not receive communication from the subprocess")
1332
Brian Curtineb24d742010-04-12 17:16:38 +00001333 os.kill(proc.pid, sig)
1334 self.assertEqual(proc.wait(), sig)
1335
1336 def test_kill_sigterm(self):
1337 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001338 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001339
1340 def test_kill_int(self):
1341 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001342 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001343
1344 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001345 tagname = "test_os_%s" % uuid.uuid1()
1346 m = mmap.mmap(-1, 1, tagname)
1347 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001348 # Run a script which has console control handling enabled.
1349 proc = subprocess.Popen([sys.executable,
1350 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001351 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001352 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1353 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001354 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001355 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001356 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001357 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001358 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001359 count += 1
1360 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001361 # Forcefully kill the process if we weren't able to signal it.
1362 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001363 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001364 os.kill(proc.pid, event)
1365 # proc.send_signal(event) could also be done here.
1366 # Allow time for the signal to be passed and the process to exit.
1367 time.sleep(0.5)
1368 if not proc.poll():
1369 # Forcefully kill the process if we weren't able to signal it.
1370 os.kill(proc.pid, signal.SIGINT)
1371 self.fail("subprocess did not stop on {}".format(name))
1372
1373 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1374 def test_CTRL_C_EVENT(self):
1375 from ctypes import wintypes
1376 import ctypes
1377
1378 # Make a NULL value by creating a pointer with no argument.
1379 NULL = ctypes.POINTER(ctypes.c_int)()
1380 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1381 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1382 wintypes.BOOL)
1383 SetConsoleCtrlHandler.restype = wintypes.BOOL
1384
1385 # Calling this with NULL and FALSE causes the calling process to
1386 # handle CTRL+C, rather than ignore it. This property is inherited
1387 # by subprocesses.
1388 SetConsoleCtrlHandler(NULL, 0)
1389
1390 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1391
1392 def test_CTRL_BREAK_EVENT(self):
1393 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1394
1395
Brian Curtind40e6f72010-07-08 21:39:08 +00001396@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001397@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001398class Win32SymlinkTests(unittest.TestCase):
1399 filelink = 'filelinktest'
1400 filelink_target = os.path.abspath(__file__)
1401 dirlink = 'dirlinktest'
1402 dirlink_target = os.path.dirname(filelink_target)
1403 missing_link = 'missing link'
1404
1405 def setUp(self):
1406 assert os.path.exists(self.dirlink_target)
1407 assert os.path.exists(self.filelink_target)
1408 assert not os.path.exists(self.dirlink)
1409 assert not os.path.exists(self.filelink)
1410 assert not os.path.exists(self.missing_link)
1411
1412 def tearDown(self):
1413 if os.path.exists(self.filelink):
1414 os.remove(self.filelink)
1415 if os.path.exists(self.dirlink):
1416 os.rmdir(self.dirlink)
1417 if os.path.lexists(self.missing_link):
1418 os.remove(self.missing_link)
1419
1420 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001421 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001422 self.assertTrue(os.path.exists(self.dirlink))
1423 self.assertTrue(os.path.isdir(self.dirlink))
1424 self.assertTrue(os.path.islink(self.dirlink))
1425 self.check_stat(self.dirlink, self.dirlink_target)
1426
1427 def test_file_link(self):
1428 os.symlink(self.filelink_target, self.filelink)
1429 self.assertTrue(os.path.exists(self.filelink))
1430 self.assertTrue(os.path.isfile(self.filelink))
1431 self.assertTrue(os.path.islink(self.filelink))
1432 self.check_stat(self.filelink, self.filelink_target)
1433
1434 def _create_missing_dir_link(self):
1435 'Create a "directory" link to a non-existent target'
1436 linkname = self.missing_link
1437 if os.path.lexists(linkname):
1438 os.remove(linkname)
1439 target = r'c:\\target does not exist.29r3c740'
1440 assert not os.path.exists(target)
1441 target_is_dir = True
1442 os.symlink(target, linkname, target_is_dir)
1443
1444 def test_remove_directory_link_to_missing_target(self):
1445 self._create_missing_dir_link()
1446 # For compatibility with Unix, os.remove will check the
1447 # directory status and call RemoveDirectory if the symlink
1448 # was created with target_is_dir==True.
1449 os.remove(self.missing_link)
1450
1451 @unittest.skip("currently fails; consider for improvement")
1452 def test_isdir_on_directory_link_to_missing_target(self):
1453 self._create_missing_dir_link()
1454 # consider having isdir return true for directory links
1455 self.assertTrue(os.path.isdir(self.missing_link))
1456
1457 @unittest.skip("currently fails; consider for improvement")
1458 def test_rmdir_on_directory_link_to_missing_target(self):
1459 self._create_missing_dir_link()
1460 # consider allowing rmdir to remove directory links
1461 os.rmdir(self.missing_link)
1462
1463 def check_stat(self, link, target):
1464 self.assertEqual(os.stat(link), os.stat(target))
1465 self.assertNotEqual(os.lstat(link), os.stat(link))
1466
Brian Curtind25aef52011-06-13 15:16:04 -05001467 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001468 with warnings.catch_warnings():
1469 warnings.simplefilter("ignore", DeprecationWarning)
1470 self.assertEqual(os.stat(bytes_link), os.stat(target))
1471 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001472
1473 def test_12084(self):
1474 level1 = os.path.abspath(support.TESTFN)
1475 level2 = os.path.join(level1, "level2")
1476 level3 = os.path.join(level2, "level3")
1477 try:
1478 os.mkdir(level1)
1479 os.mkdir(level2)
1480 os.mkdir(level3)
1481
1482 file1 = os.path.abspath(os.path.join(level1, "file1"))
1483
1484 with open(file1, "w") as f:
1485 f.write("file1")
1486
1487 orig_dir = os.getcwd()
1488 try:
1489 os.chdir(level2)
1490 link = os.path.join(level2, "link")
1491 os.symlink(os.path.relpath(file1), "link")
1492 self.assertIn("link", os.listdir(os.getcwd()))
1493
1494 # Check os.stat calls from the same dir as the link
1495 self.assertEqual(os.stat(file1), os.stat("link"))
1496
1497 # Check os.stat calls from a dir below the link
1498 os.chdir(level1)
1499 self.assertEqual(os.stat(file1),
1500 os.stat(os.path.relpath(link)))
1501
1502 # Check os.stat calls from a dir above the link
1503 os.chdir(level3)
1504 self.assertEqual(os.stat(file1),
1505 os.stat(os.path.relpath(link)))
1506 finally:
1507 os.chdir(orig_dir)
1508 except OSError as err:
1509 self.fail(err)
1510 finally:
1511 os.remove(file1)
1512 shutil.rmtree(level1)
1513
Brian Curtind40e6f72010-07-08 21:39:08 +00001514
Victor Stinnere8d51452010-08-19 01:05:19 +00001515class FSEncodingTests(unittest.TestCase):
1516 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001517 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1518 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001519
Victor Stinnere8d51452010-08-19 01:05:19 +00001520 def test_identity(self):
1521 # assert fsdecode(fsencode(x)) == x
1522 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1523 try:
1524 bytesfn = os.fsencode(fn)
1525 except UnicodeEncodeError:
1526 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001527 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001528
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001529
Brett Cannonefb00c02012-02-29 18:31:31 -05001530
1531class DeviceEncodingTests(unittest.TestCase):
1532
1533 def test_bad_fd(self):
1534 # Return None when an fd doesn't actually exist.
1535 self.assertIsNone(os.device_encoding(123456))
1536
Philip Jenveye308b7c2012-02-29 16:16:15 -08001537 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1538 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001539 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001540 def test_device_encoding(self):
1541 encoding = os.device_encoding(0)
1542 self.assertIsNotNone(encoding)
1543 self.assertTrue(codecs.lookup(encoding))
1544
1545
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001546class PidTests(unittest.TestCase):
1547 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1548 def test_getppid(self):
1549 p = subprocess.Popen([sys.executable, '-c',
1550 'import os; print(os.getppid())'],
1551 stdout=subprocess.PIPE)
1552 stdout, _ = p.communicate()
1553 # We are the parent of our subprocess
1554 self.assertEqual(int(stdout), os.getpid())
1555
1556
Brian Curtin0151b8e2010-09-24 13:43:43 +00001557# The introduction of this TestCase caused at least two different errors on
1558# *nix buildbots. Temporarily skip this to let the buildbots move along.
1559@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001560@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1561class LoginTests(unittest.TestCase):
1562 def test_getlogin(self):
1563 user_name = os.getlogin()
1564 self.assertNotEqual(len(user_name), 0)
1565
1566
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001567@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1568 "needs os.getpriority and os.setpriority")
1569class ProgramPriorityTests(unittest.TestCase):
1570 """Tests for os.getpriority() and os.setpriority()."""
1571
1572 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001573
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001574 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1575 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1576 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001577 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1578 if base >= 19 and new_prio <= 19:
1579 raise unittest.SkipTest(
1580 "unable to reliably test setpriority at current nice level of %s" % base)
1581 else:
1582 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001583 finally:
1584 try:
1585 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1586 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001587 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001588 raise
1589
1590
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001591if threading is not None:
1592 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001593
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001594 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001595
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001596 def __init__(self, conn):
1597 asynchat.async_chat.__init__(self, conn)
1598 self.in_buffer = []
1599 self.closed = False
1600 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001601
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001602 def handle_read(self):
1603 data = self.recv(4096)
1604 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001605
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001606 def get_data(self):
1607 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001608
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001609 def handle_close(self):
1610 self.close()
1611 self.closed = True
1612
1613 def handle_error(self):
1614 raise
1615
1616 def __init__(self, address):
1617 threading.Thread.__init__(self)
1618 asyncore.dispatcher.__init__(self)
1619 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1620 self.bind(address)
1621 self.listen(5)
1622 self.host, self.port = self.socket.getsockname()[:2]
1623 self.handler_instance = None
1624 self._active = False
1625 self._active_lock = threading.Lock()
1626
1627 # --- public API
1628
1629 @property
1630 def running(self):
1631 return self._active
1632
1633 def start(self):
1634 assert not self.running
1635 self.__flag = threading.Event()
1636 threading.Thread.start(self)
1637 self.__flag.wait()
1638
1639 def stop(self):
1640 assert self.running
1641 self._active = False
1642 self.join()
1643
1644 def wait(self):
1645 # wait for handler connection to be closed, then stop the server
1646 while not getattr(self.handler_instance, "closed", False):
1647 time.sleep(0.001)
1648 self.stop()
1649
1650 # --- internals
1651
1652 def run(self):
1653 self._active = True
1654 self.__flag.set()
1655 while self._active and asyncore.socket_map:
1656 self._active_lock.acquire()
1657 asyncore.loop(timeout=0.001, count=1)
1658 self._active_lock.release()
1659 asyncore.close_all()
1660
1661 def handle_accept(self):
1662 conn, addr = self.accept()
1663 self.handler_instance = self.Handler(conn)
1664
1665 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001666 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001667 handle_read = handle_connect
1668
1669 def writable(self):
1670 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001671
1672 def handle_error(self):
1673 raise
1674
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001675
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001676@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001677@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1678class TestSendfile(unittest.TestCase):
1679
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001680 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001681 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001682 not sys.platform.startswith("solaris") and \
1683 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001684
1685 @classmethod
1686 def setUpClass(cls):
1687 with open(support.TESTFN, "wb") as f:
1688 f.write(cls.DATA)
1689
1690 @classmethod
1691 def tearDownClass(cls):
1692 support.unlink(support.TESTFN)
1693
1694 def setUp(self):
1695 self.server = SendfileTestServer((support.HOST, 0))
1696 self.server.start()
1697 self.client = socket.socket()
1698 self.client.connect((self.server.host, self.server.port))
1699 self.client.settimeout(1)
1700 # synchronize by waiting for "220 ready" response
1701 self.client.recv(1024)
1702 self.sockno = self.client.fileno()
1703 self.file = open(support.TESTFN, 'rb')
1704 self.fileno = self.file.fileno()
1705
1706 def tearDown(self):
1707 self.file.close()
1708 self.client.close()
1709 if self.server.running:
1710 self.server.stop()
1711
1712 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1713 """A higher level wrapper representing how an application is
1714 supposed to use sendfile().
1715 """
1716 while 1:
1717 try:
1718 if self.SUPPORT_HEADERS_TRAILERS:
1719 return os.sendfile(sock, file, offset, nbytes, headers,
1720 trailers)
1721 else:
1722 return os.sendfile(sock, file, offset, nbytes)
1723 except OSError as err:
1724 if err.errno == errno.ECONNRESET:
1725 # disconnected
1726 raise
1727 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1728 # we have to retry send data
1729 continue
1730 else:
1731 raise
1732
1733 def test_send_whole_file(self):
1734 # normal send
1735 total_sent = 0
1736 offset = 0
1737 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001738 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001739 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1740 if sent == 0:
1741 break
1742 offset += sent
1743 total_sent += sent
1744 self.assertTrue(sent <= nbytes)
1745 self.assertEqual(offset, total_sent)
1746
1747 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001748 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001749 self.client.close()
1750 self.server.wait()
1751 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001752 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001753 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001754
1755 def test_send_at_certain_offset(self):
1756 # start sending a file at a certain offset
1757 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001758 offset = len(self.DATA) // 2
1759 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001760 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001761 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001762 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1763 if sent == 0:
1764 break
1765 offset += sent
1766 total_sent += sent
1767 self.assertTrue(sent <= nbytes)
1768
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001769 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001770 self.client.close()
1771 self.server.wait()
1772 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001773 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001774 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001775 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001776 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001777
1778 def test_offset_overflow(self):
1779 # specify an offset > file size
1780 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001781 try:
1782 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1783 except OSError as e:
1784 # Solaris can raise EINVAL if offset >= file length, ignore.
1785 if e.errno != errno.EINVAL:
1786 raise
1787 else:
1788 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001789 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001790 self.client.close()
1791 self.server.wait()
1792 data = self.server.handler_instance.get_data()
1793 self.assertEqual(data, b'')
1794
1795 def test_invalid_offset(self):
1796 with self.assertRaises(OSError) as cm:
1797 os.sendfile(self.sockno, self.fileno, -1, 4096)
1798 self.assertEqual(cm.exception.errno, errno.EINVAL)
1799
1800 # --- headers / trailers tests
1801
1802 if SUPPORT_HEADERS_TRAILERS:
1803
1804 def test_headers(self):
1805 total_sent = 0
1806 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1807 headers=[b"x" * 512])
1808 total_sent += sent
1809 offset = 4096
1810 nbytes = 4096
1811 while 1:
1812 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1813 offset, nbytes)
1814 if sent == 0:
1815 break
1816 total_sent += sent
1817 offset += sent
1818
1819 expected_data = b"x" * 512 + self.DATA
1820 self.assertEqual(total_sent, len(expected_data))
1821 self.client.close()
1822 self.server.wait()
1823 data = self.server.handler_instance.get_data()
1824 self.assertEqual(hash(data), hash(expected_data))
1825
1826 def test_trailers(self):
1827 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001828 with open(TESTFN2, 'wb') as f:
1829 f.write(b"abcde")
1830 with open(TESTFN2, 'rb')as f:
1831 self.addCleanup(os.remove, TESTFN2)
1832 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1833 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001834 self.client.close()
1835 self.server.wait()
1836 data = self.server.handler_instance.get_data()
1837 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001838
1839 if hasattr(os, "SF_NODISKIO"):
1840 def test_flags(self):
1841 try:
1842 os.sendfile(self.sockno, self.fileno, 0, 4096,
1843 flags=os.SF_NODISKIO)
1844 except OSError as err:
1845 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1846 raise
1847
1848
Antoine Pitrou424246f2012-05-12 19:02:01 +02001849@support.skip_unless_xattr
Benjamin Peterson799bd802011-08-31 22:15:17 -04001850class ExtendedAttributeTests(unittest.TestCase):
1851
1852 def tearDown(self):
1853 support.unlink(support.TESTFN)
1854
1855 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1856 fn = support.TESTFN
1857 open(fn, "wb").close()
1858 with self.assertRaises(OSError) as cm:
1859 getxattr(fn, s("user.test"))
1860 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001861 init_xattr = listxattr(fn)
1862 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001863 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001864 xattr = set(init_xattr)
1865 xattr.add("user.test")
1866 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001867 self.assertEqual(getxattr(fn, b"user.test"), b"")
1868 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1869 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1870 with self.assertRaises(OSError) as cm:
1871 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1872 self.assertEqual(cm.exception.errno, errno.EEXIST)
1873 with self.assertRaises(OSError) as cm:
1874 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1875 self.assertEqual(cm.exception.errno, errno.ENODATA)
1876 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001877 xattr.add("user.test2")
1878 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001879 removexattr(fn, s("user.test"))
1880 with self.assertRaises(OSError) as cm:
1881 getxattr(fn, s("user.test"))
1882 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001883 xattr.remove("user.test")
1884 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001885 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1886 setxattr(fn, s("user.test"), b"a"*1024)
1887 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1888 removexattr(fn, s("user.test"))
1889 many = sorted("user.test{}".format(i) for i in range(100))
1890 for thing in many:
1891 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001892 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001893
1894 def _check_xattrs(self, *args):
1895 def make_bytes(s):
1896 return bytes(s, "ascii")
1897 self._check_xattrs_str(str, *args)
1898 support.unlink(support.TESTFN)
1899 self._check_xattrs_str(make_bytes, *args)
1900
1901 def test_simple(self):
1902 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1903 os.listxattr)
1904
1905 def test_lpath(self):
1906 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1907 os.llistxattr)
1908
1909 def test_fds(self):
1910 def getxattr(path, *args):
1911 with open(path, "rb") as fp:
1912 return os.fgetxattr(fp.fileno(), *args)
1913 def setxattr(path, *args):
1914 with open(path, "wb") as fp:
1915 os.fsetxattr(fp.fileno(), *args)
1916 def removexattr(path, *args):
1917 with open(path, "wb") as fp:
1918 os.fremovexattr(fp.fileno(), *args)
1919 def listxattr(path, *args):
1920 with open(path, "rb") as fp:
1921 return os.flistxattr(fp.fileno(), *args)
1922 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1923
1924
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001925@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1926class Win32DeprecatedBytesAPI(unittest.TestCase):
1927 def test_deprecated(self):
1928 import nt
1929 filename = os.fsencode(support.TESTFN)
1930 with warnings.catch_warnings():
1931 warnings.simplefilter("error", DeprecationWarning)
1932 for func, *args in (
1933 (nt._getfullpathname, filename),
1934 (nt._isdir, filename),
1935 (os.access, filename, os.R_OK),
1936 (os.chdir, filename),
1937 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001938 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001939 (os.link, filename, filename),
1940 (os.listdir, filename),
1941 (os.lstat, filename),
1942 (os.mkdir, filename),
1943 (os.open, filename, os.O_RDONLY),
1944 (os.rename, filename, filename),
1945 (os.rmdir, filename),
1946 (os.startfile, filename),
1947 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001948 (os.unlink, filename),
1949 (os.utime, filename),
1950 ):
1951 self.assertRaises(DeprecationWarning, func, *args)
1952
Victor Stinner28216442011-11-16 00:34:44 +01001953 @support.skip_unless_symlink
1954 def test_symlink(self):
1955 filename = os.fsencode(support.TESTFN)
1956 with warnings.catch_warnings():
1957 warnings.simplefilter("error", DeprecationWarning)
1958 self.assertRaises(DeprecationWarning,
1959 os.symlink, filename, filename)
1960
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001961
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001962@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
1963class TermsizeTests(unittest.TestCase):
1964 def test_does_not_crash(self):
1965 """Check if get_terminal_size() returns a meaningful value.
1966
1967 There's no easy portable way to actually check the size of the
1968 terminal, so let's check if it returns something sensible instead.
1969 """
1970 try:
1971 size = os.get_terminal_size()
1972 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001973 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001974 # Under win32 a generic OSError can be thrown if the
1975 # handle cannot be retrieved
1976 self.skipTest("failed to query terminal size")
1977 raise
1978
Antoine Pitroucfade362012-02-08 23:48:59 +01001979 self.assertGreaterEqual(size.columns, 0)
1980 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001981
1982 def test_stty_match(self):
1983 """Check if stty returns the same results
1984
1985 stty actually tests stdin, so get_terminal_size is invoked on
1986 stdin explicitly. If stty succeeded, then get_terminal_size()
1987 should work too.
1988 """
1989 try:
1990 size = subprocess.check_output(['stty', 'size']).decode().split()
1991 except (FileNotFoundError, subprocess.CalledProcessError):
1992 self.skipTest("stty invocation failed")
1993 expected = (int(size[1]), int(size[0])) # reversed order
1994
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001995 try:
1996 actual = os.get_terminal_size(sys.__stdin__.fileno())
1997 except OSError as e:
1998 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
1999 # Under win32 a generic OSError can be thrown if the
2000 # handle cannot be retrieved
2001 self.skipTest("failed to query terminal size")
2002 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002003 self.assertEqual(expected, actual)
2004
2005
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002006@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002007def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002008 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002009 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002010 StatAttributeTests,
2011 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002012 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002013 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002014 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002015 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002016 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002017 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002018 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002019 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002020 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002021 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002022 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002023 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002024 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002025 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002026 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002027 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002028 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002029 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002030 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002031 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002032 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002033 TermsizeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002034 )
Fred Drake2e2be372001-09-20 21:33:42 +00002035
2036if __name__ == "__main__":
2037 test_main()