blob: 9a84ba1787a495457aec46f6471c121584cfe3d8 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000027try:
28 import threading
29except ImportError:
30 threading = None
Georg Brandl2daf6ae2012-02-20 19:54:16 +010031from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000032
Victor Stinner1aa54a42012-02-08 04:09:37 +010033os.stat_float_times(True)
34st = os.stat(__file__)
35stat_supports_subsecond = (
36 # check if float and int timestamps are different
37 (st.st_atime != st[7])
38 or (st.st_mtime != st[8])
39 or (st.st_ctime != st[9]))
40
Mark Dickinson7cf03892010-04-16 13:45:35 +000041# Detect whether we're on a Linux system that uses the (now outdated
42# and unmaintained) linuxthreads threading library. There's an issue
43# when combining linuxthreads with a failed execv call: see
44# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020045if hasattr(sys, 'thread_info') and sys.thread_info.version:
46 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
47else:
48 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000049
Thomas Wouters0e3f5912006-08-11 14:57:12 +000050# Tests creating TESTFN
51class FileTests(unittest.TestCase):
52 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000053 if os.path.exists(support.TESTFN):
54 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000055 tearDown = setUp
56
57 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000058 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000059 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000060 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000061
Christian Heimesfdab48e2008-01-20 09:06:41 +000062 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
64 # We must allocate two consecutive file descriptors, otherwise
65 # it will mess up other file descriptors (perhaps even the three
66 # standard ones).
67 second = os.dup(first)
68 try:
69 retries = 0
70 while second != first + 1:
71 os.close(first)
72 retries += 1
73 if retries > 10:
74 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000075 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000076 first, second = second, os.dup(second)
77 finally:
78 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000079 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000080 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000081 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000082
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000083 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000084 def test_rename(self):
85 path = support.TESTFN
86 old = sys.getrefcount(path)
87 self.assertRaises(TypeError, os.rename, path, 0)
88 new = sys.getrefcount(path)
89 self.assertEqual(old, new)
90
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000091 def test_read(self):
92 with open(support.TESTFN, "w+b") as fobj:
93 fobj.write(b"spam")
94 fobj.flush()
95 fd = fobj.fileno()
96 os.lseek(fd, 0, 0)
97 s = os.read(fd, 4)
98 self.assertEqual(type(s), bytes)
99 self.assertEqual(s, b"spam")
100
101 def test_write(self):
102 # os.write() accepts bytes- and buffer-like objects but not strings
103 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
104 self.assertRaises(TypeError, os.write, fd, "beans")
105 os.write(fd, b"bacon\n")
106 os.write(fd, bytearray(b"eggs\n"))
107 os.write(fd, memoryview(b"spam\n"))
108 os.close(fd)
109 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000110 self.assertEqual(fobj.read().splitlines(),
111 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000112
Victor Stinnere0daff12011-03-20 23:36:35 +0100113 def write_windows_console(self, *args):
114 retcode = subprocess.call(args,
115 # use a new console to not flood the test output
116 creationflags=subprocess.CREATE_NEW_CONSOLE,
117 # use a shell to hide the console window (SW_HIDE)
118 shell=True)
119 self.assertEqual(retcode, 0)
120
121 @unittest.skipUnless(sys.platform == 'win32',
122 'test specific to the Windows console')
123 def test_write_windows_console(self):
124 # Issue #11395: the Windows console returns an error (12: not enough
125 # space error) on writing into stdout if stdout mode is binary and the
126 # length is greater than 66,000 bytes (or less, depending on heap
127 # usage).
128 code = "print('x' * 100000)"
129 self.write_windows_console(sys.executable, "-c", code)
130 self.write_windows_console(sys.executable, "-u", "-c", code)
131
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000132 def fdopen_helper(self, *args):
133 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200134 f = os.fdopen(fd, *args)
135 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000136
137 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200138 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
139 os.close(fd)
140
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000141 self.fdopen_helper()
142 self.fdopen_helper('r')
143 self.fdopen_helper('r', 100)
144
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100145 def test_replace(self):
146 TESTFN2 = support.TESTFN + ".2"
147 with open(support.TESTFN, 'w') as f:
148 f.write("1")
149 with open(TESTFN2, 'w') as f:
150 f.write("2")
151 self.addCleanup(os.unlink, TESTFN2)
152 os.replace(support.TESTFN, TESTFN2)
153 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
154 with open(TESTFN2, 'r') as f:
155 self.assertEqual(f.read(), "1")
156
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200157
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000158# Test attributes on return values from os.*stat* family.
159class StatAttributeTests(unittest.TestCase):
160 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000161 os.mkdir(support.TESTFN)
162 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000163 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000164 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000166
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 def tearDown(self):
168 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000169 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000170
Antoine Pitrou38425292010-09-21 18:19:07 +0000171 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000172 if not hasattr(os, "stat"):
173 return
174
Antoine Pitrou38425292010-09-21 18:19:07 +0000175 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000176
177 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000178 self.assertEqual(result[stat.ST_SIZE], 3)
179 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 # Make sure all the attributes are there
182 members = dir(result)
183 for name in dir(stat):
184 if name[:3] == 'ST_':
185 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000186 if name.endswith("TIME"):
187 def trunc(x): return int(x)
188 else:
189 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000190 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000191 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000192 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000193
Larry Hastings6fe20b32012-04-19 15:07:49 -0700194 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700195 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700196 for name in 'st_atime st_mtime st_ctime'.split():
197 floaty = int(getattr(result, name) * 100000)
198 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700199 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700200
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000201 try:
202 result[200]
203 self.fail("No exception thrown")
204 except IndexError:
205 pass
206
207 # Make sure that assignment fails
208 try:
209 result.st_mode = 1
210 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000211 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000212 pass
213
214 try:
215 result.st_rdev = 1
216 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000217 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000218 pass
219
220 try:
221 result.parrot = 1
222 self.fail("No exception thrown")
223 except AttributeError:
224 pass
225
226 # Use the stat_result constructor with a too-short tuple.
227 try:
228 result2 = os.stat_result((10,))
229 self.fail("No exception thrown")
230 except TypeError:
231 pass
232
Ezio Melotti42da6632011-03-15 05:18:48 +0200233 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234 try:
235 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
236 except TypeError:
237 pass
238
Antoine Pitrou38425292010-09-21 18:19:07 +0000239 def test_stat_attributes(self):
240 self.check_stat_attributes(self.fname)
241
242 def test_stat_attributes_bytes(self):
243 try:
244 fname = self.fname.encode(sys.getfilesystemencoding())
245 except UnicodeEncodeError:
246 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100247 with warnings.catch_warnings():
248 warnings.simplefilter("ignore", DeprecationWarning)
249 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000250
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000251 def test_statvfs_attributes(self):
252 if not hasattr(os, "statvfs"):
253 return
254
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000255 try:
256 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000257 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000258 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000259 if e.errno == errno.ENOSYS:
260 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261
262 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000263 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000265 # Make sure all the attributes are there.
266 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
267 'ffree', 'favail', 'flag', 'namemax')
268 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270
271 # Make sure that assignment really fails
272 try:
273 result.f_bfree = 1
274 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000275 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276 pass
277
278 try:
279 result.parrot = 1
280 self.fail("No exception thrown")
281 except AttributeError:
282 pass
283
284 # Use the constructor with a too-short tuple.
285 try:
286 result2 = os.statvfs_result((10,))
287 self.fail("No exception thrown")
288 except TypeError:
289 pass
290
Ezio Melotti42da6632011-03-15 05:18:48 +0200291 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 try:
293 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
294 except TypeError:
295 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000296
Thomas Wouters89f507f2006-12-13 04:49:30 +0000297 def test_utime_dir(self):
298 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000299 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000300 # round to int, because some systems may support sub-second
301 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000302 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
303 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000304 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000305
Larry Hastings76ad59b2012-05-03 00:30:07 -0700306 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600307 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600308 # second argument. Check that the previous methods of passing
309 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700310 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600311 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700312 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
313 # Setting the time to the time you just read, then reading again,
314 # should always return exactly the same times.
315 st1 = os.stat(filename)
316 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
317 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600318 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700319 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600320 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700321 # Set to the current time in the new way
322 os.utime(filename)
323 st3 = os.stat(filename)
324 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
325
326 def test_utime(self):
327 def utime(file, times):
328 return os.utime(file, times)
329 self._test_utime(self.fname, getattr, utime, 10)
330 self._test_utime(support.TESTFN, getattr, utime, 10)
331
332
333 def _test_utime_ns(self, set_times_ns, test_dir=True):
334 def getattr_ns(o, attr):
335 return getattr(o, attr + "_ns")
336 ten_s = 10 * 1000 * 1000 * 1000
337 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
338 if test_dir:
339 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
340
341 def test_utime_ns(self):
342 def utime_ns(file, times):
343 return os.utime(file, ns=times)
344 self._test_utime_ns(utime_ns)
345
346 requires_lutimes = unittest.skipUnless(hasattr(os, 'lutimes'),
347 "os.lutimes required for this test.")
348 requires_futimes = unittest.skipUnless(hasattr(os, 'futimes'),
349 "os.futimes required for this test.")
350
351 @requires_lutimes
352 def test_lutimes_ns(self):
353 def lutimes_ns(file, times):
354 return os.lutimes(file, ns=times)
355 self._test_utime_ns(lutimes_ns)
356
357 @requires_futimes
358 def test_futimes_ns(self):
359 def futimes_ns(file, times):
360 with open(file, "wb") as f:
361 os.futimes(f.fileno(), ns=times)
362 self._test_utime_ns(futimes_ns, test_dir=False)
363
364 def _utime_invalid_arguments(self, name, arg):
365 with self.assertRaises(RuntimeError):
366 getattr(os, name)(arg, (5, 5), ns=(5, 5))
367
368 def test_utime_invalid_arguments(self):
369 self._utime_invalid_arguments('utime', self.fname)
370
371 @requires_lutimes
372 def test_lutimes_invalid_arguments(self):
373 self._utime_invalid_arguments('lutimes', self.fname)
374
375 @requires_futimes
376 def test_futimes_invalid_arguments(self):
377 with open(self.fname, "wb") as f:
378 self._utime_invalid_arguments('futimes', f.fileno())
379
Brian Curtin52fbea12011-11-06 13:41:17 -0600380
Victor Stinner1aa54a42012-02-08 04:09:37 +0100381 @unittest.skipUnless(stat_supports_subsecond,
382 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100383 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100384 asec, amsec = 1, 901
385 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100386 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100387 mtime = msec + mmsec * 1e-3
388 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100389 os.utime(filename, (0, 0))
390 set_time_func(filename, atime, mtime)
Victor Stinner1aa54a42012-02-08 04:09:37 +0100391 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100392 st = os.stat(filename)
393 self.assertAlmostEqual(st.st_atime, atime, places=3)
394 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100395
Victor Stinnera2f7c002012-02-08 03:36:25 +0100396 def test_utime_subsecond(self):
397 def set_time(filename, atime, mtime):
398 os.utime(filename, (atime, mtime))
399 self._test_utime_subsecond(set_time)
400
Larry Hastings76ad59b2012-05-03 00:30:07 -0700401 @requires_futimes
Victor Stinnera2f7c002012-02-08 03:36:25 +0100402 def test_futimes_subsecond(self):
403 def set_time(filename, atime, mtime):
404 with open(filename, "wb") as f:
405 os.futimes(f.fileno(), (atime, mtime))
406 self._test_utime_subsecond(set_time)
407
408 @unittest.skipUnless(hasattr(os, 'futimens'),
409 "os.futimens required for this test.")
410 def test_futimens_subsecond(self):
411 def set_time(filename, atime, mtime):
412 with open(filename, "wb") as f:
413 asec, ansec = divmod(atime, 1.0)
414 asec = int(asec)
415 ansec = int(ansec * 1e9)
416 msec, mnsec = divmod(mtime, 1.0)
417 msec = int(msec)
418 mnsec = int(mnsec * 1e9)
419 os.futimens(f.fileno(),
420 (asec, ansec),
421 (msec, mnsec))
422 self._test_utime_subsecond(set_time)
423
424 @unittest.skipUnless(hasattr(os, 'futimesat'),
425 "os.futimesat required for this test.")
426 def test_futimesat_subsecond(self):
427 def set_time(filename, atime, mtime):
428 dirname = os.path.dirname(filename)
429 dirfd = os.open(dirname, os.O_RDONLY)
430 try:
431 os.futimesat(dirfd, os.path.basename(filename),
432 (atime, mtime))
433 finally:
434 os.close(dirfd)
435 self._test_utime_subsecond(set_time)
436
Larry Hastings76ad59b2012-05-03 00:30:07 -0700437 @requires_lutimes
Victor Stinnera2f7c002012-02-08 03:36:25 +0100438 def test_lutimes_subsecond(self):
439 def set_time(filename, atime, mtime):
440 os.lutimes(filename, (atime, mtime))
441 self._test_utime_subsecond(set_time)
442
443 @unittest.skipUnless(hasattr(os, 'utimensat'),
444 "os.utimensat required for this test.")
445 def test_utimensat_subsecond(self):
446 def set_time(filename, atime, mtime):
447 dirname = os.path.dirname(filename)
448 dirfd = os.open(dirname, os.O_RDONLY)
449 try:
450 asec, ansec = divmod(atime, 1.0)
451 asec = int(asec)
452 ansec = int(ansec * 1e9)
453 msec, mnsec = divmod(mtime, 1.0)
454 msec = int(msec)
455 mnsec = int(mnsec * 1e9)
456 os.utimensat(dirfd, os.path.basename(filename),
457 (asec, ansec),
458 (msec, mnsec))
459 finally:
460 os.close(dirfd)
461 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100462
Thomas Wouters89f507f2006-12-13 04:49:30 +0000463 # Restrict test to Win32, since there is no guarantee other
464 # systems support centiseconds
465 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000466 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000467 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000468 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000469 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000470 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000471 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000472 return buf.value
473
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000474 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000475 def test_1565150(self):
476 t1 = 1159195039.25
477 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000478 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000479
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000480 def test_large_time(self):
481 t1 = 5000000000 # some day in 2128
482 os.utime(self.fname, (t1, t1))
483 self.assertEqual(os.stat(self.fname).st_mtime, t1)
484
Guido van Rossumd8faa362007-04-27 19:54:29 +0000485 def test_1686475(self):
486 # Verify that an open file can be stat'ed
487 try:
488 os.stat(r"c:\pagefile.sys")
489 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000490 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000491 return
492 self.fail("Could not stat pagefile.sys")
493
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000494from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000495
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000496class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000497 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000498 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000499
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000500 def setUp(self):
501 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000502 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000503 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000504 for key, value in self._reference().items():
505 os.environ[key] = value
506
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000507 def tearDown(self):
508 os.environ.clear()
509 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000510 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000511 os.environb.clear()
512 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000513
Christian Heimes90333392007-11-01 19:08:42 +0000514 def _reference(self):
515 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
516
517 def _empty_mapping(self):
518 os.environ.clear()
519 return os.environ
520
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000521 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000522 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000523 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000524 if os.path.exists("/bin/sh"):
525 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000526 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
527 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000528 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000529
Christian Heimes1a13d592007-11-08 14:16:55 +0000530 def test_os_popen_iter(self):
531 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000532 with os.popen(
533 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
534 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000535 self.assertEqual(next(it), "line1\n")
536 self.assertEqual(next(it), "line2\n")
537 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000538 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000539
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000540 # Verify environ keys and values from the OS are of the
541 # correct str type.
542 def test_keyvalue_types(self):
543 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000544 self.assertEqual(type(key), str)
545 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000546
Christian Heimes90333392007-11-01 19:08:42 +0000547 def test_items(self):
548 for key, value in self._reference().items():
549 self.assertEqual(os.environ.get(key), value)
550
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000551 # Issue 7310
552 def test___repr__(self):
553 """Check that the repr() of os.environ looks like environ({...})."""
554 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000555 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
556 '{!r}: {!r}'.format(key, value)
557 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000558
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000559 def test_get_exec_path(self):
560 defpath_list = os.defpath.split(os.pathsep)
561 test_path = ['/monty', '/python', '', '/flying/circus']
562 test_env = {'PATH': os.pathsep.join(test_path)}
563
564 saved_environ = os.environ
565 try:
566 os.environ = dict(test_env)
567 # Test that defaulting to os.environ works.
568 self.assertSequenceEqual(test_path, os.get_exec_path())
569 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
570 finally:
571 os.environ = saved_environ
572
573 # No PATH environment variable
574 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
575 # Empty PATH environment variable
576 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
577 # Supplied PATH environment variable
578 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
579
Victor Stinnerb745a742010-05-18 17:17:23 +0000580 if os.supports_bytes_environ:
581 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000582 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000583 # ignore BytesWarning warning
584 with warnings.catch_warnings(record=True):
585 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000586 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000587 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000588 pass
589 else:
590 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000591
592 # bytes key and/or value
593 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
594 ['abc'])
595 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
596 ['abc'])
597 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
598 ['abc'])
599
600 @unittest.skipUnless(os.supports_bytes_environ,
601 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000602 def test_environb(self):
603 # os.environ -> os.environb
604 value = 'euro\u20ac'
605 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000606 value_bytes = value.encode(sys.getfilesystemencoding(),
607 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000608 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000609 msg = "U+20AC character is not encodable to %s" % (
610 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000611 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000612 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000613 self.assertEqual(os.environ['unicode'], value)
614 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000615
616 # os.environb -> os.environ
617 value = b'\xff'
618 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000619 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000620 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000621 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000622
Charles-François Natali2966f102011-11-26 11:32:46 +0100623 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
624 # #13415).
625 @support.requires_freebsd_version(7)
626 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100627 def test_unset_error(self):
628 if sys.platform == "win32":
629 # an environment variable is limited to 32,767 characters
630 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100631 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100632 else:
633 # "=" is not allowed in a variable name
634 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100635 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100636
Tim Petersc4e09402003-04-25 07:11:48 +0000637class WalkTests(unittest.TestCase):
638 """Tests for os.walk()."""
639
Charles-François Natali7372b062012-02-05 15:15:38 +0100640 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000641 import os
642 from os.path import join
643
644 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000645 # TESTFN/
646 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000647 # tmp1
648 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000649 # tmp2
650 # SUB11/ no kids
651 # SUB2/ a file kid and a dirsymlink kid
652 # tmp3
653 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200654 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000655 # TEST2/
656 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000657 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000658 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000659 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000660 sub2_path = join(walk_path, "SUB2")
661 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000662 tmp2_path = join(sub1_path, "tmp2")
663 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000664 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000665 t2_path = join(support.TESTFN, "TEST2")
666 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200667 link_path = join(sub2_path, "link")
668 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000669
670 # Create stuff.
671 os.makedirs(sub11_path)
672 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000673 os.makedirs(t2_path)
674 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000675 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000676 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
677 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000678 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100679 if os.name == 'nt':
680 def symlink_to_dir(src, dest):
681 os.symlink(src, dest, True)
682 else:
683 symlink_to_dir = os.symlink
684 symlink_to_dir(os.path.abspath(t2_path), link_path)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200685 symlink_to_dir('broken', broken_link_path)
686 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000687 else:
688 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000689
690 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000691 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000692 self.assertEqual(len(all), 4)
693 # We can't know which order SUB1 and SUB2 will appear in.
694 # Not flipped: TESTFN, SUB1, SUB11, SUB2
695 # flipped: TESTFN, SUB2, SUB1, SUB11
696 flipped = all[0][1][0] != "SUB1"
697 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200698 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000699 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000700 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
701 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000702 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000703
704 # Prune the search.
705 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000706 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000707 all.append((root, dirs, files))
708 # Don't descend into SUB1.
709 if 'SUB1' in dirs:
710 # Note that this also mutates the dirs we appended to all!
711 dirs.remove('SUB1')
712 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000713 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
714 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000715
716 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000717 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000718 self.assertEqual(len(all), 4)
719 # We can't know which order SUB1 and SUB2 will appear in.
720 # Not flipped: SUB11, SUB1, SUB2, TESTFN
721 # flipped: SUB2, SUB11, SUB1, TESTFN
722 flipped = all[3][1][0] != "SUB1"
723 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000724 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000725 self.assertEqual(all[flipped], (sub11_path, [], []))
726 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000727 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000728
Brian Curtin3b4499c2010-12-28 14:31:47 +0000729 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000730 # Walk, following symlinks.
731 for root, dirs, files in os.walk(walk_path, followlinks=True):
732 if root == link_path:
733 self.assertEqual(dirs, [])
734 self.assertEqual(files, ["tmp4"])
735 break
736 else:
737 self.fail("Didn't follow symlink with followlinks=True")
738
739 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000740 # Tear everything down. This is a decent use for bottom-up on
741 # Windows, which doesn't have a recursive delete command. The
742 # (not so) subtlety is that rmdir will fail unless the dir's
743 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000744 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000745 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000746 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000747 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000748 dirname = os.path.join(root, name)
749 if not os.path.islink(dirname):
750 os.rmdir(dirname)
751 else:
752 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000753 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000754
Charles-François Natali7372b062012-02-05 15:15:38 +0100755
756@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
757class FwalkTests(WalkTests):
758 """Tests for os.fwalk()."""
759
760 def test_compare_to_walk(self):
761 # compare with walk() results
762 for topdown, followlinks in itertools.product((True, False), repeat=2):
763 args = support.TESTFN, topdown, None, followlinks
764 expected = {}
765 for root, dirs, files in os.walk(*args):
766 expected[root] = (set(dirs), set(files))
767
768 for root, dirs, files, rootfd in os.fwalk(*args):
769 self.assertIn(root, expected)
770 self.assertEqual(expected[root], (set(dirs), set(files)))
771
772 def test_dir_fd(self):
773 # check returned file descriptors
774 for topdown, followlinks in itertools.product((True, False), repeat=2):
775 args = support.TESTFN, topdown, None, followlinks
776 for root, dirs, files, rootfd in os.fwalk(*args):
777 # check that the FD is valid
778 os.fstat(rootfd)
Charles-François Natali77940902012-02-06 19:54:48 +0100779 # check that flistdir() returns consistent information
780 self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100781
782 def test_fd_leak(self):
783 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
784 # we both check that calling fwalk() a large number of times doesn't
785 # yield EMFILE, and that the minimum allocated FD hasn't changed.
786 minfd = os.dup(1)
787 os.close(minfd)
788 for i in range(256):
789 for x in os.fwalk(support.TESTFN):
790 pass
791 newfd = os.dup(1)
792 self.addCleanup(os.close, newfd)
793 self.assertEqual(newfd, minfd)
794
795 def tearDown(self):
796 # cleanup
797 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
798 for name in files:
799 os.unlinkat(rootfd, name)
800 for name in dirs:
801 st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW)
802 if stat.S_ISDIR(st.st_mode):
803 os.unlinkat(rootfd, name, os.AT_REMOVEDIR)
804 else:
805 os.unlinkat(rootfd, name)
806 os.rmdir(support.TESTFN)
807
808
Guido van Rossume7ba4952007-06-06 23:52:48 +0000809class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000810 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000811 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000812
813 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000814 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000815 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
816 os.makedirs(path) # Should work
817 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
818 os.makedirs(path)
819
820 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000821 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000822 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
823 os.makedirs(path)
824 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
825 'dir5', 'dir6')
826 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000827
Terry Reedy5a22b652010-12-02 07:05:56 +0000828 def test_exist_ok_existing_directory(self):
829 path = os.path.join(support.TESTFN, 'dir1')
830 mode = 0o777
831 old_mask = os.umask(0o022)
832 os.makedirs(path, mode)
833 self.assertRaises(OSError, os.makedirs, path, mode)
834 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
835 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
836 os.makedirs(path, mode=mode, exist_ok=True)
837 os.umask(old_mask)
838
839 def test_exist_ok_existing_regular_file(self):
840 base = support.TESTFN
841 path = os.path.join(support.TESTFN, 'dir1')
842 f = open(path, 'w')
843 f.write('abc')
844 f.close()
845 self.assertRaises(OSError, os.makedirs, path)
846 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
847 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
848 os.remove(path)
849
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000850 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000851 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000852 'dir4', 'dir5', 'dir6')
853 # If the tests failed, the bottom-most directory ('../dir6')
854 # may not have been created, so we look for the outermost directory
855 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000856 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000857 path = os.path.dirname(path)
858
859 os.removedirs(path)
860
Guido van Rossume7ba4952007-06-06 23:52:48 +0000861class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000862 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200863 with open(os.devnull, 'wb') as f:
864 f.write(b'hello')
865 f.close()
866 with open(os.devnull, 'rb') as f:
867 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000868
Guido van Rossume7ba4952007-06-06 23:52:48 +0000869class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100870 def test_urandom_length(self):
871 self.assertEqual(len(os.urandom(0)), 0)
872 self.assertEqual(len(os.urandom(1)), 1)
873 self.assertEqual(len(os.urandom(10)), 10)
874 self.assertEqual(len(os.urandom(100)), 100)
875 self.assertEqual(len(os.urandom(1000)), 1000)
876
877 def test_urandom_value(self):
878 data1 = os.urandom(16)
879 data2 = os.urandom(16)
880 self.assertNotEqual(data1, data2)
881
882 def get_urandom_subprocess(self, count):
883 code = '\n'.join((
884 'import os, sys',
885 'data = os.urandom(%s)' % count,
886 'sys.stdout.buffer.write(data)',
887 'sys.stdout.buffer.flush()'))
888 out = assert_python_ok('-c', code)
889 stdout = out[1]
890 self.assertEqual(len(stdout), 16)
891 return stdout
892
893 def test_urandom_subprocess(self):
894 data1 = self.get_urandom_subprocess(16)
895 data2 = self.get_urandom_subprocess(16)
896 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000897
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000898@contextlib.contextmanager
899def _execvpe_mockup(defpath=None):
900 """
901 Stubs out execv and execve functions when used as context manager.
902 Records exec calls. The mock execv and execve functions always raise an
903 exception as they would normally never return.
904 """
905 # A list of tuples containing (function name, first arg, args)
906 # of calls to execv or execve that have been made.
907 calls = []
908
909 def mock_execv(name, *args):
910 calls.append(('execv', name, args))
911 raise RuntimeError("execv called")
912
913 def mock_execve(name, *args):
914 calls.append(('execve', name, args))
915 raise OSError(errno.ENOTDIR, "execve called")
916
917 try:
918 orig_execv = os.execv
919 orig_execve = os.execve
920 orig_defpath = os.defpath
921 os.execv = mock_execv
922 os.execve = mock_execve
923 if defpath is not None:
924 os.defpath = defpath
925 yield calls
926 finally:
927 os.execv = orig_execv
928 os.execve = orig_execve
929 os.defpath = orig_defpath
930
Guido van Rossume7ba4952007-06-06 23:52:48 +0000931class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000932 @unittest.skipIf(USING_LINUXTHREADS,
933 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000934 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000935 self.assertRaises(OSError, os.execvpe, 'no such app-',
936 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000937
Thomas Heller6790d602007-08-30 17:15:14 +0000938 def test_execvpe_with_bad_arglist(self):
939 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
940
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000941 @unittest.skipUnless(hasattr(os, '_execvpe'),
942 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000943 def _test_internal_execvpe(self, test_type):
944 program_path = os.sep + 'absolutepath'
945 if test_type is bytes:
946 program = b'executable'
947 fullpath = os.path.join(os.fsencode(program_path), program)
948 native_fullpath = fullpath
949 arguments = [b'progname', 'arg1', 'arg2']
950 else:
951 program = 'executable'
952 arguments = ['progname', 'arg1', 'arg2']
953 fullpath = os.path.join(program_path, program)
954 if os.name != "nt":
955 native_fullpath = os.fsencode(fullpath)
956 else:
957 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000958 env = {'spam': 'beans'}
959
Victor Stinnerb745a742010-05-18 17:17:23 +0000960 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000961 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000962 self.assertRaises(RuntimeError,
963 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000964 self.assertEqual(len(calls), 1)
965 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
966
Victor Stinnerb745a742010-05-18 17:17:23 +0000967 # test os._execvpe() with a relative path:
968 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000969 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000970 self.assertRaises(OSError,
971 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000972 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000973 self.assertSequenceEqual(calls[0],
974 ('execve', native_fullpath, (arguments, env)))
975
976 # test os._execvpe() with a relative path:
977 # os.get_exec_path() reads the 'PATH' variable
978 with _execvpe_mockup() as calls:
979 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000980 if test_type is bytes:
981 env_path[b'PATH'] = program_path
982 else:
983 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000984 self.assertRaises(OSError,
985 os._execvpe, program, arguments, env=env_path)
986 self.assertEqual(len(calls), 1)
987 self.assertSequenceEqual(calls[0],
988 ('execve', native_fullpath, (arguments, env_path)))
989
990 def test_internal_execvpe_str(self):
991 self._test_internal_execvpe(str)
992 if os.name != "nt":
993 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000994
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000995
Thomas Wouters477c8d52006-05-27 19:21:47 +0000996class Win32ErrorTests(unittest.TestCase):
997 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000998 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000999
1000 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001001 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001002
1003 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001004 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001005
1006 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001007 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001008 try:
1009 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
1010 finally:
1011 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001012 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001013
1014 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001015 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001016
Thomas Wouters477c8d52006-05-27 19:21:47 +00001017 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +00001018 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001019
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001020class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001021 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001022 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1023 #singles.append("close")
1024 #We omit close because it doesn'r raise an exception on some platforms
1025 def get_single(f):
1026 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001027 if hasattr(os, f):
1028 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001029 return helper
1030 for f in singles:
1031 locals()["test_"+f] = get_single(f)
1032
Benjamin Peterson7522c742009-01-19 21:00:09 +00001033 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001034 try:
1035 f(support.make_bad_fd(), *args)
1036 except OSError as e:
1037 self.assertEqual(e.errno, errno.EBADF)
1038 else:
1039 self.fail("%r didn't raise a OSError with a bad file descriptor"
1040 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001041
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001042 def test_isatty(self):
1043 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001044 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001045
1046 def test_closerange(self):
1047 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001048 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +00001049 # Make sure none of the descriptors we are about to close are
1050 # currently valid (issue 6542).
1051 for i in range(10):
1052 try: os.fstat(fd+i)
1053 except OSError:
1054 pass
1055 else:
1056 break
1057 if i < 2:
1058 raise unittest.SkipTest(
1059 "Unable to acquire a range of invalid file descriptors")
1060 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001061
1062 def test_dup2(self):
1063 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001064 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001065
1066 def test_fchmod(self):
1067 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001068 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001069
1070 def test_fchown(self):
1071 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001072 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001073
1074 def test_fpathconf(self):
1075 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001076 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001077
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001078 def test_ftruncate(self):
1079 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001080 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001081
1082 def test_lseek(self):
1083 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001084 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001085
1086 def test_read(self):
1087 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001088 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001089
1090 def test_tcsetpgrpt(self):
1091 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001092 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001093
1094 def test_write(self):
1095 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001096 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001097
Brian Curtin1b9df392010-11-24 20:24:31 +00001098
1099class LinkTests(unittest.TestCase):
1100 def setUp(self):
1101 self.file1 = support.TESTFN
1102 self.file2 = os.path.join(support.TESTFN + "2")
1103
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001104 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001105 for file in (self.file1, self.file2):
1106 if os.path.exists(file):
1107 os.unlink(file)
1108
Brian Curtin1b9df392010-11-24 20:24:31 +00001109 def _test_link(self, file1, file2):
1110 with open(file1, "w") as f1:
1111 f1.write("test")
1112
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001113 with warnings.catch_warnings():
1114 warnings.simplefilter("ignore", DeprecationWarning)
1115 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001116 with open(file1, "r") as f1, open(file2, "r") as f2:
1117 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1118
1119 def test_link(self):
1120 self._test_link(self.file1, self.file2)
1121
1122 def test_link_bytes(self):
1123 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1124 bytes(self.file2, sys.getfilesystemencoding()))
1125
Brian Curtinf498b752010-11-30 15:54:04 +00001126 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001127 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001128 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001129 except UnicodeError:
1130 raise unittest.SkipTest("Unable to encode for this platform.")
1131
Brian Curtinf498b752010-11-30 15:54:04 +00001132 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001133 self.file2 = self.file1 + "2"
1134 self._test_link(self.file1, self.file2)
1135
Thomas Wouters477c8d52006-05-27 19:21:47 +00001136if sys.platform != 'win32':
1137 class Win32ErrorTests(unittest.TestCase):
1138 pass
1139
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001140 class PosixUidGidTests(unittest.TestCase):
1141 if hasattr(os, 'setuid'):
1142 def test_setuid(self):
1143 if os.getuid() != 0:
1144 self.assertRaises(os.error, os.setuid, 0)
1145 self.assertRaises(OverflowError, os.setuid, 1<<32)
1146
1147 if hasattr(os, 'setgid'):
1148 def test_setgid(self):
1149 if os.getuid() != 0:
1150 self.assertRaises(os.error, os.setgid, 0)
1151 self.assertRaises(OverflowError, os.setgid, 1<<32)
1152
1153 if hasattr(os, 'seteuid'):
1154 def test_seteuid(self):
1155 if os.getuid() != 0:
1156 self.assertRaises(os.error, os.seteuid, 0)
1157 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1158
1159 if hasattr(os, 'setegid'):
1160 def test_setegid(self):
1161 if os.getuid() != 0:
1162 self.assertRaises(os.error, os.setegid, 0)
1163 self.assertRaises(OverflowError, os.setegid, 1<<32)
1164
1165 if hasattr(os, 'setreuid'):
1166 def test_setreuid(self):
1167 if os.getuid() != 0:
1168 self.assertRaises(os.error, os.setreuid, 0, 0)
1169 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1170 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001171
1172 def test_setreuid_neg1(self):
1173 # Needs to accept -1. We run this in a subprocess to avoid
1174 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001175 subprocess.check_call([
1176 sys.executable, '-c',
1177 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001178
1179 if hasattr(os, 'setregid'):
1180 def test_setregid(self):
1181 if os.getuid() != 0:
1182 self.assertRaises(os.error, os.setregid, 0, 0)
1183 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1184 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001185
1186 def test_setregid_neg1(self):
1187 # Needs to accept -1. We run this in a subprocess to avoid
1188 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001189 subprocess.check_call([
1190 sys.executable, '-c',
1191 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001192
1193 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001194 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001195 if support.TESTFN_UNENCODABLE:
1196 self.dir = support.TESTFN_UNENCODABLE
1197 else:
1198 self.dir = support.TESTFN
1199 self.bdir = os.fsencode(self.dir)
1200
1201 bytesfn = []
1202 def add_filename(fn):
1203 try:
1204 fn = os.fsencode(fn)
1205 except UnicodeEncodeError:
1206 return
1207 bytesfn.append(fn)
1208 add_filename(support.TESTFN_UNICODE)
1209 if support.TESTFN_UNENCODABLE:
1210 add_filename(support.TESTFN_UNENCODABLE)
1211 if not bytesfn:
1212 self.skipTest("couldn't create any non-ascii filename")
1213
1214 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001215 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001216 try:
1217 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001218 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001219 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001220 if fn in self.unicodefn:
1221 raise ValueError("duplicate filename")
1222 self.unicodefn.add(fn)
1223 except:
1224 shutil.rmtree(self.dir)
1225 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001226
1227 def tearDown(self):
1228 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001229
1230 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001231 expected = self.unicodefn
1232 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001233 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001234
1235 def test_open(self):
1236 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001237 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001238 f.close()
1239
1240 def test_stat(self):
1241 for fn in self.unicodefn:
1242 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001243else:
1244 class PosixUidGidTests(unittest.TestCase):
1245 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001246 class Pep383Tests(unittest.TestCase):
1247 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001248
Brian Curtineb24d742010-04-12 17:16:38 +00001249@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1250class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001251 def _kill(self, sig):
1252 # Start sys.executable as a subprocess and communicate from the
1253 # subprocess to the parent that the interpreter is ready. When it
1254 # becomes ready, send *sig* via os.kill to the subprocess and check
1255 # that the return code is equal to *sig*.
1256 import ctypes
1257 from ctypes import wintypes
1258 import msvcrt
1259
1260 # Since we can't access the contents of the process' stdout until the
1261 # process has exited, use PeekNamedPipe to see what's inside stdout
1262 # without waiting. This is done so we can tell that the interpreter
1263 # is started and running at a point where it could handle a signal.
1264 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1265 PeekNamedPipe.restype = wintypes.BOOL
1266 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1267 ctypes.POINTER(ctypes.c_char), # stdout buf
1268 wintypes.DWORD, # Buffer size
1269 ctypes.POINTER(wintypes.DWORD), # bytes read
1270 ctypes.POINTER(wintypes.DWORD), # bytes avail
1271 ctypes.POINTER(wintypes.DWORD)) # bytes left
1272 msg = "running"
1273 proc = subprocess.Popen([sys.executable, "-c",
1274 "import sys;"
1275 "sys.stdout.write('{}');"
1276 "sys.stdout.flush();"
1277 "input()".format(msg)],
1278 stdout=subprocess.PIPE,
1279 stderr=subprocess.PIPE,
1280 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001281 self.addCleanup(proc.stdout.close)
1282 self.addCleanup(proc.stderr.close)
1283 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001284
1285 count, max = 0, 100
1286 while count < max and proc.poll() is None:
1287 # Create a string buffer to store the result of stdout from the pipe
1288 buf = ctypes.create_string_buffer(len(msg))
1289 # Obtain the text currently in proc.stdout
1290 # Bytes read/avail/left are left as NULL and unused
1291 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1292 buf, ctypes.sizeof(buf), None, None, None)
1293 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1294 if buf.value:
1295 self.assertEqual(msg, buf.value.decode())
1296 break
1297 time.sleep(0.1)
1298 count += 1
1299 else:
1300 self.fail("Did not receive communication from the subprocess")
1301
Brian Curtineb24d742010-04-12 17:16:38 +00001302 os.kill(proc.pid, sig)
1303 self.assertEqual(proc.wait(), sig)
1304
1305 def test_kill_sigterm(self):
1306 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001307 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001308
1309 def test_kill_int(self):
1310 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001311 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001312
1313 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001314 tagname = "test_os_%s" % uuid.uuid1()
1315 m = mmap.mmap(-1, 1, tagname)
1316 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001317 # Run a script which has console control handling enabled.
1318 proc = subprocess.Popen([sys.executable,
1319 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001320 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001321 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1322 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001323 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001324 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001325 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001326 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001327 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001328 count += 1
1329 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001330 # Forcefully kill the process if we weren't able to signal it.
1331 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001332 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001333 os.kill(proc.pid, event)
1334 # proc.send_signal(event) could also be done here.
1335 # Allow time for the signal to be passed and the process to exit.
1336 time.sleep(0.5)
1337 if not proc.poll():
1338 # Forcefully kill the process if we weren't able to signal it.
1339 os.kill(proc.pid, signal.SIGINT)
1340 self.fail("subprocess did not stop on {}".format(name))
1341
1342 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1343 def test_CTRL_C_EVENT(self):
1344 from ctypes import wintypes
1345 import ctypes
1346
1347 # Make a NULL value by creating a pointer with no argument.
1348 NULL = ctypes.POINTER(ctypes.c_int)()
1349 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1350 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1351 wintypes.BOOL)
1352 SetConsoleCtrlHandler.restype = wintypes.BOOL
1353
1354 # Calling this with NULL and FALSE causes the calling process to
1355 # handle CTRL+C, rather than ignore it. This property is inherited
1356 # by subprocesses.
1357 SetConsoleCtrlHandler(NULL, 0)
1358
1359 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1360
1361 def test_CTRL_BREAK_EVENT(self):
1362 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1363
1364
Brian Curtind40e6f72010-07-08 21:39:08 +00001365@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001366@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001367class Win32SymlinkTests(unittest.TestCase):
1368 filelink = 'filelinktest'
1369 filelink_target = os.path.abspath(__file__)
1370 dirlink = 'dirlinktest'
1371 dirlink_target = os.path.dirname(filelink_target)
1372 missing_link = 'missing link'
1373
1374 def setUp(self):
1375 assert os.path.exists(self.dirlink_target)
1376 assert os.path.exists(self.filelink_target)
1377 assert not os.path.exists(self.dirlink)
1378 assert not os.path.exists(self.filelink)
1379 assert not os.path.exists(self.missing_link)
1380
1381 def tearDown(self):
1382 if os.path.exists(self.filelink):
1383 os.remove(self.filelink)
1384 if os.path.exists(self.dirlink):
1385 os.rmdir(self.dirlink)
1386 if os.path.lexists(self.missing_link):
1387 os.remove(self.missing_link)
1388
1389 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001390 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001391 self.assertTrue(os.path.exists(self.dirlink))
1392 self.assertTrue(os.path.isdir(self.dirlink))
1393 self.assertTrue(os.path.islink(self.dirlink))
1394 self.check_stat(self.dirlink, self.dirlink_target)
1395
1396 def test_file_link(self):
1397 os.symlink(self.filelink_target, self.filelink)
1398 self.assertTrue(os.path.exists(self.filelink))
1399 self.assertTrue(os.path.isfile(self.filelink))
1400 self.assertTrue(os.path.islink(self.filelink))
1401 self.check_stat(self.filelink, self.filelink_target)
1402
1403 def _create_missing_dir_link(self):
1404 'Create a "directory" link to a non-existent target'
1405 linkname = self.missing_link
1406 if os.path.lexists(linkname):
1407 os.remove(linkname)
1408 target = r'c:\\target does not exist.29r3c740'
1409 assert not os.path.exists(target)
1410 target_is_dir = True
1411 os.symlink(target, linkname, target_is_dir)
1412
1413 def test_remove_directory_link_to_missing_target(self):
1414 self._create_missing_dir_link()
1415 # For compatibility with Unix, os.remove will check the
1416 # directory status and call RemoveDirectory if the symlink
1417 # was created with target_is_dir==True.
1418 os.remove(self.missing_link)
1419
1420 @unittest.skip("currently fails; consider for improvement")
1421 def test_isdir_on_directory_link_to_missing_target(self):
1422 self._create_missing_dir_link()
1423 # consider having isdir return true for directory links
1424 self.assertTrue(os.path.isdir(self.missing_link))
1425
1426 @unittest.skip("currently fails; consider for improvement")
1427 def test_rmdir_on_directory_link_to_missing_target(self):
1428 self._create_missing_dir_link()
1429 # consider allowing rmdir to remove directory links
1430 os.rmdir(self.missing_link)
1431
1432 def check_stat(self, link, target):
1433 self.assertEqual(os.stat(link), os.stat(target))
1434 self.assertNotEqual(os.lstat(link), os.stat(link))
1435
Brian Curtind25aef52011-06-13 15:16:04 -05001436 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001437 with warnings.catch_warnings():
1438 warnings.simplefilter("ignore", DeprecationWarning)
1439 self.assertEqual(os.stat(bytes_link), os.stat(target))
1440 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001441
1442 def test_12084(self):
1443 level1 = os.path.abspath(support.TESTFN)
1444 level2 = os.path.join(level1, "level2")
1445 level3 = os.path.join(level2, "level3")
1446 try:
1447 os.mkdir(level1)
1448 os.mkdir(level2)
1449 os.mkdir(level3)
1450
1451 file1 = os.path.abspath(os.path.join(level1, "file1"))
1452
1453 with open(file1, "w") as f:
1454 f.write("file1")
1455
1456 orig_dir = os.getcwd()
1457 try:
1458 os.chdir(level2)
1459 link = os.path.join(level2, "link")
1460 os.symlink(os.path.relpath(file1), "link")
1461 self.assertIn("link", os.listdir(os.getcwd()))
1462
1463 # Check os.stat calls from the same dir as the link
1464 self.assertEqual(os.stat(file1), os.stat("link"))
1465
1466 # Check os.stat calls from a dir below the link
1467 os.chdir(level1)
1468 self.assertEqual(os.stat(file1),
1469 os.stat(os.path.relpath(link)))
1470
1471 # Check os.stat calls from a dir above the link
1472 os.chdir(level3)
1473 self.assertEqual(os.stat(file1),
1474 os.stat(os.path.relpath(link)))
1475 finally:
1476 os.chdir(orig_dir)
1477 except OSError as err:
1478 self.fail(err)
1479 finally:
1480 os.remove(file1)
1481 shutil.rmtree(level1)
1482
Brian Curtind40e6f72010-07-08 21:39:08 +00001483
Victor Stinnere8d51452010-08-19 01:05:19 +00001484class FSEncodingTests(unittest.TestCase):
1485 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001486 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1487 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001488
Victor Stinnere8d51452010-08-19 01:05:19 +00001489 def test_identity(self):
1490 # assert fsdecode(fsencode(x)) == x
1491 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1492 try:
1493 bytesfn = os.fsencode(fn)
1494 except UnicodeEncodeError:
1495 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001496 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001497
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001498
Brett Cannonefb00c02012-02-29 18:31:31 -05001499
1500class DeviceEncodingTests(unittest.TestCase):
1501
1502 def test_bad_fd(self):
1503 # Return None when an fd doesn't actually exist.
1504 self.assertIsNone(os.device_encoding(123456))
1505
Philip Jenveye308b7c2012-02-29 16:16:15 -08001506 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1507 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001508 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001509 def test_device_encoding(self):
1510 encoding = os.device_encoding(0)
1511 self.assertIsNotNone(encoding)
1512 self.assertTrue(codecs.lookup(encoding))
1513
1514
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001515class PidTests(unittest.TestCase):
1516 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1517 def test_getppid(self):
1518 p = subprocess.Popen([sys.executable, '-c',
1519 'import os; print(os.getppid())'],
1520 stdout=subprocess.PIPE)
1521 stdout, _ = p.communicate()
1522 # We are the parent of our subprocess
1523 self.assertEqual(int(stdout), os.getpid())
1524
1525
Brian Curtin0151b8e2010-09-24 13:43:43 +00001526# The introduction of this TestCase caused at least two different errors on
1527# *nix buildbots. Temporarily skip this to let the buildbots move along.
1528@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001529@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1530class LoginTests(unittest.TestCase):
1531 def test_getlogin(self):
1532 user_name = os.getlogin()
1533 self.assertNotEqual(len(user_name), 0)
1534
1535
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001536@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1537 "needs os.getpriority and os.setpriority")
1538class ProgramPriorityTests(unittest.TestCase):
1539 """Tests for os.getpriority() and os.setpriority()."""
1540
1541 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001542
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001543 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1544 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1545 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001546 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1547 if base >= 19 and new_prio <= 19:
1548 raise unittest.SkipTest(
1549 "unable to reliably test setpriority at current nice level of %s" % base)
1550 else:
1551 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001552 finally:
1553 try:
1554 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1555 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001556 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001557 raise
1558
1559
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001560if threading is not None:
1561 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001562
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001563 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001564
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001565 def __init__(self, conn):
1566 asynchat.async_chat.__init__(self, conn)
1567 self.in_buffer = []
1568 self.closed = False
1569 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001570
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001571 def handle_read(self):
1572 data = self.recv(4096)
1573 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001574
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001575 def get_data(self):
1576 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001577
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001578 def handle_close(self):
1579 self.close()
1580 self.closed = True
1581
1582 def handle_error(self):
1583 raise
1584
1585 def __init__(self, address):
1586 threading.Thread.__init__(self)
1587 asyncore.dispatcher.__init__(self)
1588 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1589 self.bind(address)
1590 self.listen(5)
1591 self.host, self.port = self.socket.getsockname()[:2]
1592 self.handler_instance = None
1593 self._active = False
1594 self._active_lock = threading.Lock()
1595
1596 # --- public API
1597
1598 @property
1599 def running(self):
1600 return self._active
1601
1602 def start(self):
1603 assert not self.running
1604 self.__flag = threading.Event()
1605 threading.Thread.start(self)
1606 self.__flag.wait()
1607
1608 def stop(self):
1609 assert self.running
1610 self._active = False
1611 self.join()
1612
1613 def wait(self):
1614 # wait for handler connection to be closed, then stop the server
1615 while not getattr(self.handler_instance, "closed", False):
1616 time.sleep(0.001)
1617 self.stop()
1618
1619 # --- internals
1620
1621 def run(self):
1622 self._active = True
1623 self.__flag.set()
1624 while self._active and asyncore.socket_map:
1625 self._active_lock.acquire()
1626 asyncore.loop(timeout=0.001, count=1)
1627 self._active_lock.release()
1628 asyncore.close_all()
1629
1630 def handle_accept(self):
1631 conn, addr = self.accept()
1632 self.handler_instance = self.Handler(conn)
1633
1634 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001635 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001636 handle_read = handle_connect
1637
1638 def writable(self):
1639 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001640
1641 def handle_error(self):
1642 raise
1643
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001644
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001645@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001646@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1647class TestSendfile(unittest.TestCase):
1648
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001649 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001650 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001651 not sys.platform.startswith("solaris") and \
1652 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001653
1654 @classmethod
1655 def setUpClass(cls):
1656 with open(support.TESTFN, "wb") as f:
1657 f.write(cls.DATA)
1658
1659 @classmethod
1660 def tearDownClass(cls):
1661 support.unlink(support.TESTFN)
1662
1663 def setUp(self):
1664 self.server = SendfileTestServer((support.HOST, 0))
1665 self.server.start()
1666 self.client = socket.socket()
1667 self.client.connect((self.server.host, self.server.port))
1668 self.client.settimeout(1)
1669 # synchronize by waiting for "220 ready" response
1670 self.client.recv(1024)
1671 self.sockno = self.client.fileno()
1672 self.file = open(support.TESTFN, 'rb')
1673 self.fileno = self.file.fileno()
1674
1675 def tearDown(self):
1676 self.file.close()
1677 self.client.close()
1678 if self.server.running:
1679 self.server.stop()
1680
1681 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1682 """A higher level wrapper representing how an application is
1683 supposed to use sendfile().
1684 """
1685 while 1:
1686 try:
1687 if self.SUPPORT_HEADERS_TRAILERS:
1688 return os.sendfile(sock, file, offset, nbytes, headers,
1689 trailers)
1690 else:
1691 return os.sendfile(sock, file, offset, nbytes)
1692 except OSError as err:
1693 if err.errno == errno.ECONNRESET:
1694 # disconnected
1695 raise
1696 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1697 # we have to retry send data
1698 continue
1699 else:
1700 raise
1701
1702 def test_send_whole_file(self):
1703 # normal send
1704 total_sent = 0
1705 offset = 0
1706 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001707 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001708 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1709 if sent == 0:
1710 break
1711 offset += sent
1712 total_sent += sent
1713 self.assertTrue(sent <= nbytes)
1714 self.assertEqual(offset, total_sent)
1715
1716 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001717 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001718 self.client.close()
1719 self.server.wait()
1720 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001721 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001722 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001723
1724 def test_send_at_certain_offset(self):
1725 # start sending a file at a certain offset
1726 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001727 offset = len(self.DATA) // 2
1728 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001729 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001730 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001731 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1732 if sent == 0:
1733 break
1734 offset += sent
1735 total_sent += sent
1736 self.assertTrue(sent <= nbytes)
1737
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001738 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001739 self.client.close()
1740 self.server.wait()
1741 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001742 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001743 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001744 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001745 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001746
1747 def test_offset_overflow(self):
1748 # specify an offset > file size
1749 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001750 try:
1751 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1752 except OSError as e:
1753 # Solaris can raise EINVAL if offset >= file length, ignore.
1754 if e.errno != errno.EINVAL:
1755 raise
1756 else:
1757 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001758 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001759 self.client.close()
1760 self.server.wait()
1761 data = self.server.handler_instance.get_data()
1762 self.assertEqual(data, b'')
1763
1764 def test_invalid_offset(self):
1765 with self.assertRaises(OSError) as cm:
1766 os.sendfile(self.sockno, self.fileno, -1, 4096)
1767 self.assertEqual(cm.exception.errno, errno.EINVAL)
1768
1769 # --- headers / trailers tests
1770
1771 if SUPPORT_HEADERS_TRAILERS:
1772
1773 def test_headers(self):
1774 total_sent = 0
1775 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1776 headers=[b"x" * 512])
1777 total_sent += sent
1778 offset = 4096
1779 nbytes = 4096
1780 while 1:
1781 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1782 offset, nbytes)
1783 if sent == 0:
1784 break
1785 total_sent += sent
1786 offset += sent
1787
1788 expected_data = b"x" * 512 + self.DATA
1789 self.assertEqual(total_sent, len(expected_data))
1790 self.client.close()
1791 self.server.wait()
1792 data = self.server.handler_instance.get_data()
1793 self.assertEqual(hash(data), hash(expected_data))
1794
1795 def test_trailers(self):
1796 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001797 with open(TESTFN2, 'wb') as f:
1798 f.write(b"abcde")
1799 with open(TESTFN2, 'rb')as f:
1800 self.addCleanup(os.remove, TESTFN2)
1801 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1802 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001803 self.client.close()
1804 self.server.wait()
1805 data = self.server.handler_instance.get_data()
1806 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001807
1808 if hasattr(os, "SF_NODISKIO"):
1809 def test_flags(self):
1810 try:
1811 os.sendfile(self.sockno, self.fileno, 0, 4096,
1812 flags=os.SF_NODISKIO)
1813 except OSError as err:
1814 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1815 raise
1816
1817
Antoine Pitrou424246f2012-05-12 19:02:01 +02001818@support.skip_unless_xattr
Benjamin Peterson799bd802011-08-31 22:15:17 -04001819class ExtendedAttributeTests(unittest.TestCase):
1820
1821 def tearDown(self):
1822 support.unlink(support.TESTFN)
1823
1824 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1825 fn = support.TESTFN
1826 open(fn, "wb").close()
1827 with self.assertRaises(OSError) as cm:
1828 getxattr(fn, s("user.test"))
1829 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001830 init_xattr = listxattr(fn)
1831 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001832 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001833 xattr = set(init_xattr)
1834 xattr.add("user.test")
1835 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001836 self.assertEqual(getxattr(fn, b"user.test"), b"")
1837 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1838 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1839 with self.assertRaises(OSError) as cm:
1840 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1841 self.assertEqual(cm.exception.errno, errno.EEXIST)
1842 with self.assertRaises(OSError) as cm:
1843 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1844 self.assertEqual(cm.exception.errno, errno.ENODATA)
1845 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001846 xattr.add("user.test2")
1847 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001848 removexattr(fn, s("user.test"))
1849 with self.assertRaises(OSError) as cm:
1850 getxattr(fn, s("user.test"))
1851 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001852 xattr.remove("user.test")
1853 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001854 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1855 setxattr(fn, s("user.test"), b"a"*1024)
1856 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1857 removexattr(fn, s("user.test"))
1858 many = sorted("user.test{}".format(i) for i in range(100))
1859 for thing in many:
1860 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001861 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001862
1863 def _check_xattrs(self, *args):
1864 def make_bytes(s):
1865 return bytes(s, "ascii")
1866 self._check_xattrs_str(str, *args)
1867 support.unlink(support.TESTFN)
1868 self._check_xattrs_str(make_bytes, *args)
1869
1870 def test_simple(self):
1871 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1872 os.listxattr)
1873
1874 def test_lpath(self):
1875 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1876 os.llistxattr)
1877
1878 def test_fds(self):
1879 def getxattr(path, *args):
1880 with open(path, "rb") as fp:
1881 return os.fgetxattr(fp.fileno(), *args)
1882 def setxattr(path, *args):
1883 with open(path, "wb") as fp:
1884 os.fsetxattr(fp.fileno(), *args)
1885 def removexattr(path, *args):
1886 with open(path, "wb") as fp:
1887 os.fremovexattr(fp.fileno(), *args)
1888 def listxattr(path, *args):
1889 with open(path, "rb") as fp:
1890 return os.flistxattr(fp.fileno(), *args)
1891 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1892
1893
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001894@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1895class Win32DeprecatedBytesAPI(unittest.TestCase):
1896 def test_deprecated(self):
1897 import nt
1898 filename = os.fsencode(support.TESTFN)
1899 with warnings.catch_warnings():
1900 warnings.simplefilter("error", DeprecationWarning)
1901 for func, *args in (
1902 (nt._getfullpathname, filename),
1903 (nt._isdir, filename),
1904 (os.access, filename, os.R_OK),
1905 (os.chdir, filename),
1906 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001907 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001908 (os.link, filename, filename),
1909 (os.listdir, filename),
1910 (os.lstat, filename),
1911 (os.mkdir, filename),
1912 (os.open, filename, os.O_RDONLY),
1913 (os.rename, filename, filename),
1914 (os.rmdir, filename),
1915 (os.startfile, filename),
1916 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001917 (os.unlink, filename),
1918 (os.utime, filename),
1919 ):
1920 self.assertRaises(DeprecationWarning, func, *args)
1921
Victor Stinner28216442011-11-16 00:34:44 +01001922 @support.skip_unless_symlink
1923 def test_symlink(self):
1924 filename = os.fsencode(support.TESTFN)
1925 with warnings.catch_warnings():
1926 warnings.simplefilter("error", DeprecationWarning)
1927 self.assertRaises(DeprecationWarning,
1928 os.symlink, filename, filename)
1929
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001930
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001931@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
1932class TermsizeTests(unittest.TestCase):
1933 def test_does_not_crash(self):
1934 """Check if get_terminal_size() returns a meaningful value.
1935
1936 There's no easy portable way to actually check the size of the
1937 terminal, so let's check if it returns something sensible instead.
1938 """
1939 try:
1940 size = os.get_terminal_size()
1941 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001942 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001943 # Under win32 a generic OSError can be thrown if the
1944 # handle cannot be retrieved
1945 self.skipTest("failed to query terminal size")
1946 raise
1947
Antoine Pitroucfade362012-02-08 23:48:59 +01001948 self.assertGreaterEqual(size.columns, 0)
1949 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001950
1951 def test_stty_match(self):
1952 """Check if stty returns the same results
1953
1954 stty actually tests stdin, so get_terminal_size is invoked on
1955 stdin explicitly. If stty succeeded, then get_terminal_size()
1956 should work too.
1957 """
1958 try:
1959 size = subprocess.check_output(['stty', 'size']).decode().split()
1960 except (FileNotFoundError, subprocess.CalledProcessError):
1961 self.skipTest("stty invocation failed")
1962 expected = (int(size[1]), int(size[0])) # reversed order
1963
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001964 try:
1965 actual = os.get_terminal_size(sys.__stdin__.fileno())
1966 except OSError as e:
1967 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
1968 # Under win32 a generic OSError can be thrown if the
1969 # handle cannot be retrieved
1970 self.skipTest("failed to query terminal size")
1971 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001972 self.assertEqual(expected, actual)
1973
1974
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001975@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001976def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001977 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001978 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001979 StatAttributeTests,
1980 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001981 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01001982 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001983 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001984 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001985 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001986 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001987 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001988 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001989 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001990 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001991 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001992 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001993 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05001994 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001995 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001996 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001997 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001998 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001999 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002000 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002001 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002002 TermsizeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002003 )
Fred Drake2e2be372001-09-20 21:33:42 +00002004
2005if __name__ == "__main__":
2006 test_main()