blob: 066bf7231e3ff7167ea723b54af18aeb038faf08 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000027try:
28 import threading
29except ImportError:
30 threading = None
Georg Brandl2daf6ae2012-02-20 19:54:16 +010031from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000032
Victor Stinner1aa54a42012-02-08 04:09:37 +010033os.stat_float_times(True)
34st = os.stat(__file__)
35stat_supports_subsecond = (
36 # check if float and int timestamps are different
37 (st.st_atime != st[7])
38 or (st.st_mtime != st[8])
39 or (st.st_ctime != st[9]))
40
Mark Dickinson7cf03892010-04-16 13:45:35 +000041# Detect whether we're on a Linux system that uses the (now outdated
42# and unmaintained) linuxthreads threading library. There's an issue
43# when combining linuxthreads with a failed execv call: see
44# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020045if hasattr(sys, 'thread_info') and sys.thread_info.version:
46 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
47else:
48 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000049
Thomas Wouters0e3f5912006-08-11 14:57:12 +000050# Tests creating TESTFN
51class FileTests(unittest.TestCase):
52 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000053 if os.path.exists(support.TESTFN):
54 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000055 tearDown = setUp
56
57 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000058 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000059 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000060 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000061
Christian Heimesfdab48e2008-01-20 09:06:41 +000062 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
64 # We must allocate two consecutive file descriptors, otherwise
65 # it will mess up other file descriptors (perhaps even the three
66 # standard ones).
67 second = os.dup(first)
68 try:
69 retries = 0
70 while second != first + 1:
71 os.close(first)
72 retries += 1
73 if retries > 10:
74 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000075 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000076 first, second = second, os.dup(second)
77 finally:
78 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000079 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000080 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000081 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000082
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000083 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000084 def test_rename(self):
85 path = support.TESTFN
86 old = sys.getrefcount(path)
87 self.assertRaises(TypeError, os.rename, path, 0)
88 new = sys.getrefcount(path)
89 self.assertEqual(old, new)
90
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000091 def test_read(self):
92 with open(support.TESTFN, "w+b") as fobj:
93 fobj.write(b"spam")
94 fobj.flush()
95 fd = fobj.fileno()
96 os.lseek(fd, 0, 0)
97 s = os.read(fd, 4)
98 self.assertEqual(type(s), bytes)
99 self.assertEqual(s, b"spam")
100
101 def test_write(self):
102 # os.write() accepts bytes- and buffer-like objects but not strings
103 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
104 self.assertRaises(TypeError, os.write, fd, "beans")
105 os.write(fd, b"bacon\n")
106 os.write(fd, bytearray(b"eggs\n"))
107 os.write(fd, memoryview(b"spam\n"))
108 os.close(fd)
109 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000110 self.assertEqual(fobj.read().splitlines(),
111 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000112
Victor Stinnere0daff12011-03-20 23:36:35 +0100113 def write_windows_console(self, *args):
114 retcode = subprocess.call(args,
115 # use a new console to not flood the test output
116 creationflags=subprocess.CREATE_NEW_CONSOLE,
117 # use a shell to hide the console window (SW_HIDE)
118 shell=True)
119 self.assertEqual(retcode, 0)
120
121 @unittest.skipUnless(sys.platform == 'win32',
122 'test specific to the Windows console')
123 def test_write_windows_console(self):
124 # Issue #11395: the Windows console returns an error (12: not enough
125 # space error) on writing into stdout if stdout mode is binary and the
126 # length is greater than 66,000 bytes (or less, depending on heap
127 # usage).
128 code = "print('x' * 100000)"
129 self.write_windows_console(sys.executable, "-c", code)
130 self.write_windows_console(sys.executable, "-u", "-c", code)
131
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000132 def fdopen_helper(self, *args):
133 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200134 f = os.fdopen(fd, *args)
135 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000136
137 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200138 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
139 os.close(fd)
140
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000141 self.fdopen_helper()
142 self.fdopen_helper('r')
143 self.fdopen_helper('r', 100)
144
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100145 def test_replace(self):
146 TESTFN2 = support.TESTFN + ".2"
147 with open(support.TESTFN, 'w') as f:
148 f.write("1")
149 with open(TESTFN2, 'w') as f:
150 f.write("2")
151 self.addCleanup(os.unlink, TESTFN2)
152 os.replace(support.TESTFN, TESTFN2)
153 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
154 with open(TESTFN2, 'r') as f:
155 self.assertEqual(f.read(), "1")
156
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200157
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000158# Test attributes on return values from os.*stat* family.
159class StatAttributeTests(unittest.TestCase):
160 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000161 os.mkdir(support.TESTFN)
162 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000163 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000164 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000166
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 def tearDown(self):
168 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000169 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000170
Antoine Pitrou38425292010-09-21 18:19:07 +0000171 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000172 if not hasattr(os, "stat"):
173 return
174
Antoine Pitrou38425292010-09-21 18:19:07 +0000175 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000176
177 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000178 self.assertEqual(result[stat.ST_SIZE], 3)
179 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 # Make sure all the attributes are there
182 members = dir(result)
183 for name in dir(stat):
184 if name[:3] == 'ST_':
185 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000186 if name.endswith("TIME"):
187 def trunc(x): return int(x)
188 else:
189 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000190 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000191 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000192 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000193
Larry Hastings6fe20b32012-04-19 15:07:49 -0700194 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700195 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700196 for name in 'st_atime st_mtime st_ctime'.split():
197 floaty = int(getattr(result, name) * 100000)
198 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700199 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700200
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000201 try:
202 result[200]
203 self.fail("No exception thrown")
204 except IndexError:
205 pass
206
207 # Make sure that assignment fails
208 try:
209 result.st_mode = 1
210 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000211 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000212 pass
213
214 try:
215 result.st_rdev = 1
216 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000217 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000218 pass
219
220 try:
221 result.parrot = 1
222 self.fail("No exception thrown")
223 except AttributeError:
224 pass
225
226 # Use the stat_result constructor with a too-short tuple.
227 try:
228 result2 = os.stat_result((10,))
229 self.fail("No exception thrown")
230 except TypeError:
231 pass
232
Ezio Melotti42da6632011-03-15 05:18:48 +0200233 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234 try:
235 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
236 except TypeError:
237 pass
238
Antoine Pitrou38425292010-09-21 18:19:07 +0000239 def test_stat_attributes(self):
240 self.check_stat_attributes(self.fname)
241
242 def test_stat_attributes_bytes(self):
243 try:
244 fname = self.fname.encode(sys.getfilesystemencoding())
245 except UnicodeEncodeError:
246 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100247 with warnings.catch_warnings():
248 warnings.simplefilter("ignore", DeprecationWarning)
249 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000250
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000251 def test_statvfs_attributes(self):
252 if not hasattr(os, "statvfs"):
253 return
254
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000255 try:
256 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000257 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000258 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000259 if e.errno == errno.ENOSYS:
260 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261
262 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000263 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000265 # Make sure all the attributes are there.
266 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
267 'ffree', 'favail', 'flag', 'namemax')
268 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270
271 # Make sure that assignment really fails
272 try:
273 result.f_bfree = 1
274 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000275 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276 pass
277
278 try:
279 result.parrot = 1
280 self.fail("No exception thrown")
281 except AttributeError:
282 pass
283
284 # Use the constructor with a too-short tuple.
285 try:
286 result2 = os.statvfs_result((10,))
287 self.fail("No exception thrown")
288 except TypeError:
289 pass
290
Ezio Melotti42da6632011-03-15 05:18:48 +0200291 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 try:
293 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
294 except TypeError:
295 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000296
Thomas Wouters89f507f2006-12-13 04:49:30 +0000297 def test_utime_dir(self):
298 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000299 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000300 # round to int, because some systems may support sub-second
301 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000302 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
303 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000304 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000305
Larry Hastings76ad59b2012-05-03 00:30:07 -0700306 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600307 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600308 # second argument. Check that the previous methods of passing
309 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700310 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600311 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700312 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
313 # Setting the time to the time you just read, then reading again,
314 # should always return exactly the same times.
315 st1 = os.stat(filename)
316 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
317 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600318 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700319 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600320 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700321 # Set to the current time in the new way
322 os.utime(filename)
323 st3 = os.stat(filename)
324 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
325
326 def test_utime(self):
327 def utime(file, times):
328 return os.utime(file, times)
329 self._test_utime(self.fname, getattr, utime, 10)
330 self._test_utime(support.TESTFN, getattr, utime, 10)
331
332
333 def _test_utime_ns(self, set_times_ns, test_dir=True):
334 def getattr_ns(o, attr):
335 return getattr(o, attr + "_ns")
336 ten_s = 10 * 1000 * 1000 * 1000
337 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
338 if test_dir:
339 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
340
341 def test_utime_ns(self):
342 def utime_ns(file, times):
343 return os.utime(file, ns=times)
344 self._test_utime_ns(utime_ns)
345
346 requires_lutimes = unittest.skipUnless(hasattr(os, 'lutimes'),
347 "os.lutimes required for this test.")
348 requires_futimes = unittest.skipUnless(hasattr(os, 'futimes'),
349 "os.futimes required for this test.")
350
351 @requires_lutimes
352 def test_lutimes_ns(self):
353 def lutimes_ns(file, times):
354 return os.lutimes(file, ns=times)
355 self._test_utime_ns(lutimes_ns)
356
357 @requires_futimes
358 def test_futimes_ns(self):
359 def futimes_ns(file, times):
360 with open(file, "wb") as f:
361 os.futimes(f.fileno(), ns=times)
362 self._test_utime_ns(futimes_ns, test_dir=False)
363
364 def _utime_invalid_arguments(self, name, arg):
365 with self.assertRaises(RuntimeError):
366 getattr(os, name)(arg, (5, 5), ns=(5, 5))
367
368 def test_utime_invalid_arguments(self):
369 self._utime_invalid_arguments('utime', self.fname)
370
371 @requires_lutimes
372 def test_lutimes_invalid_arguments(self):
373 self._utime_invalid_arguments('lutimes', self.fname)
374
375 @requires_futimes
376 def test_futimes_invalid_arguments(self):
377 with open(self.fname, "wb") as f:
378 self._utime_invalid_arguments('futimes', f.fileno())
379
Brian Curtin52fbea12011-11-06 13:41:17 -0600380
Victor Stinner1aa54a42012-02-08 04:09:37 +0100381 @unittest.skipUnless(stat_supports_subsecond,
382 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100383 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100384 asec, amsec = 1, 901
385 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100386 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100387 mtime = msec + mmsec * 1e-3
388 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100389 os.utime(filename, (0, 0))
390 set_time_func(filename, atime, mtime)
Victor Stinner1aa54a42012-02-08 04:09:37 +0100391 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100392 st = os.stat(filename)
393 self.assertAlmostEqual(st.st_atime, atime, places=3)
394 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100395
Victor Stinnera2f7c002012-02-08 03:36:25 +0100396 def test_utime_subsecond(self):
397 def set_time(filename, atime, mtime):
398 os.utime(filename, (atime, mtime))
399 self._test_utime_subsecond(set_time)
400
Larry Hastings76ad59b2012-05-03 00:30:07 -0700401 @requires_futimes
Victor Stinnera2f7c002012-02-08 03:36:25 +0100402 def test_futimes_subsecond(self):
403 def set_time(filename, atime, mtime):
404 with open(filename, "wb") as f:
405 os.futimes(f.fileno(), (atime, mtime))
406 self._test_utime_subsecond(set_time)
407
408 @unittest.skipUnless(hasattr(os, 'futimens'),
409 "os.futimens required for this test.")
410 def test_futimens_subsecond(self):
411 def set_time(filename, atime, mtime):
412 with open(filename, "wb") as f:
413 asec, ansec = divmod(atime, 1.0)
414 asec = int(asec)
415 ansec = int(ansec * 1e9)
416 msec, mnsec = divmod(mtime, 1.0)
417 msec = int(msec)
418 mnsec = int(mnsec * 1e9)
419 os.futimens(f.fileno(),
420 (asec, ansec),
421 (msec, mnsec))
422 self._test_utime_subsecond(set_time)
423
424 @unittest.skipUnless(hasattr(os, 'futimesat'),
425 "os.futimesat required for this test.")
426 def test_futimesat_subsecond(self):
427 def set_time(filename, atime, mtime):
428 dirname = os.path.dirname(filename)
429 dirfd = os.open(dirname, os.O_RDONLY)
430 try:
431 os.futimesat(dirfd, os.path.basename(filename),
432 (atime, mtime))
433 finally:
434 os.close(dirfd)
435 self._test_utime_subsecond(set_time)
436
Larry Hastings76ad59b2012-05-03 00:30:07 -0700437 @requires_lutimes
Victor Stinnera2f7c002012-02-08 03:36:25 +0100438 def test_lutimes_subsecond(self):
439 def set_time(filename, atime, mtime):
440 os.lutimes(filename, (atime, mtime))
441 self._test_utime_subsecond(set_time)
442
443 @unittest.skipUnless(hasattr(os, 'utimensat'),
444 "os.utimensat required for this test.")
445 def test_utimensat_subsecond(self):
446 def set_time(filename, atime, mtime):
447 dirname = os.path.dirname(filename)
448 dirfd = os.open(dirname, os.O_RDONLY)
449 try:
450 asec, ansec = divmod(atime, 1.0)
451 asec = int(asec)
452 ansec = int(ansec * 1e9)
453 msec, mnsec = divmod(mtime, 1.0)
454 msec = int(msec)
455 mnsec = int(mnsec * 1e9)
456 os.utimensat(dirfd, os.path.basename(filename),
457 (asec, ansec),
458 (msec, mnsec))
459 finally:
460 os.close(dirfd)
461 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100462
Thomas Wouters89f507f2006-12-13 04:49:30 +0000463 # Restrict test to Win32, since there is no guarantee other
464 # systems support centiseconds
465 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000466 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000467 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000468 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000469 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000470 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000471 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000472 return buf.value
473
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000474 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000475 def test_1565150(self):
476 t1 = 1159195039.25
477 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000478 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000479
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000480 def test_large_time(self):
481 t1 = 5000000000 # some day in 2128
482 os.utime(self.fname, (t1, t1))
483 self.assertEqual(os.stat(self.fname).st_mtime, t1)
484
Guido van Rossumd8faa362007-04-27 19:54:29 +0000485 def test_1686475(self):
486 # Verify that an open file can be stat'ed
487 try:
488 os.stat(r"c:\pagefile.sys")
489 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000490 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000491 return
492 self.fail("Could not stat pagefile.sys")
493
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000494from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000495
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000496class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000497 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000498 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000499
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000500 def setUp(self):
501 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000502 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000503 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000504 for key, value in self._reference().items():
505 os.environ[key] = value
506
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000507 def tearDown(self):
508 os.environ.clear()
509 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000510 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000511 os.environb.clear()
512 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000513
Christian Heimes90333392007-11-01 19:08:42 +0000514 def _reference(self):
515 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
516
517 def _empty_mapping(self):
518 os.environ.clear()
519 return os.environ
520
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000521 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000522 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000523 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000524 if os.path.exists("/bin/sh"):
525 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000526 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
527 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000528 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000529
Christian Heimes1a13d592007-11-08 14:16:55 +0000530 def test_os_popen_iter(self):
531 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000532 with os.popen(
533 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
534 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000535 self.assertEqual(next(it), "line1\n")
536 self.assertEqual(next(it), "line2\n")
537 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000538 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000539
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000540 # Verify environ keys and values from the OS are of the
541 # correct str type.
542 def test_keyvalue_types(self):
543 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000544 self.assertEqual(type(key), str)
545 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000546
Christian Heimes90333392007-11-01 19:08:42 +0000547 def test_items(self):
548 for key, value in self._reference().items():
549 self.assertEqual(os.environ.get(key), value)
550
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000551 # Issue 7310
552 def test___repr__(self):
553 """Check that the repr() of os.environ looks like environ({...})."""
554 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000555 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
556 '{!r}: {!r}'.format(key, value)
557 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000558
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000559 def test_get_exec_path(self):
560 defpath_list = os.defpath.split(os.pathsep)
561 test_path = ['/monty', '/python', '', '/flying/circus']
562 test_env = {'PATH': os.pathsep.join(test_path)}
563
564 saved_environ = os.environ
565 try:
566 os.environ = dict(test_env)
567 # Test that defaulting to os.environ works.
568 self.assertSequenceEqual(test_path, os.get_exec_path())
569 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
570 finally:
571 os.environ = saved_environ
572
573 # No PATH environment variable
574 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
575 # Empty PATH environment variable
576 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
577 # Supplied PATH environment variable
578 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
579
Victor Stinnerb745a742010-05-18 17:17:23 +0000580 if os.supports_bytes_environ:
581 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000582 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000583 # ignore BytesWarning warning
584 with warnings.catch_warnings(record=True):
585 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000586 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000587 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000588 pass
589 else:
590 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000591
592 # bytes key and/or value
593 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
594 ['abc'])
595 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
596 ['abc'])
597 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
598 ['abc'])
599
600 @unittest.skipUnless(os.supports_bytes_environ,
601 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000602 def test_environb(self):
603 # os.environ -> os.environb
604 value = 'euro\u20ac'
605 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000606 value_bytes = value.encode(sys.getfilesystemencoding(),
607 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000608 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000609 msg = "U+20AC character is not encodable to %s" % (
610 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000611 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000612 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000613 self.assertEqual(os.environ['unicode'], value)
614 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000615
616 # os.environb -> os.environ
617 value = b'\xff'
618 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000619 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000620 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000621 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000622
Charles-François Natali2966f102011-11-26 11:32:46 +0100623 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
624 # #13415).
625 @support.requires_freebsd_version(7)
626 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100627 def test_unset_error(self):
628 if sys.platform == "win32":
629 # an environment variable is limited to 32,767 characters
630 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100631 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100632 else:
633 # "=" is not allowed in a variable name
634 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100635 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100636
Tim Petersc4e09402003-04-25 07:11:48 +0000637class WalkTests(unittest.TestCase):
638 """Tests for os.walk()."""
639
Charles-François Natali7372b062012-02-05 15:15:38 +0100640 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000641 import os
642 from os.path import join
643
644 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000645 # TESTFN/
646 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000647 # tmp1
648 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000649 # tmp2
650 # SUB11/ no kids
651 # SUB2/ a file kid and a dirsymlink kid
652 # tmp3
653 # link/ a symlink to TESTFN.2
654 # TEST2/
655 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000656 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000657 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000658 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000659 sub2_path = join(walk_path, "SUB2")
660 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000661 tmp2_path = join(sub1_path, "tmp2")
662 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000663 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000664 t2_path = join(support.TESTFN, "TEST2")
665 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000666
667 # Create stuff.
668 os.makedirs(sub11_path)
669 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000670 os.makedirs(t2_path)
671 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000672 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000673 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
674 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000675 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100676 if os.name == 'nt':
677 def symlink_to_dir(src, dest):
678 os.symlink(src, dest, True)
679 else:
680 symlink_to_dir = os.symlink
681 symlink_to_dir(os.path.abspath(t2_path), link_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000682 sub2_tree = (sub2_path, ["link"], ["tmp3"])
683 else:
684 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000685
686 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000687 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000688 self.assertEqual(len(all), 4)
689 # We can't know which order SUB1 and SUB2 will appear in.
690 # Not flipped: TESTFN, SUB1, SUB11, SUB2
691 # flipped: TESTFN, SUB2, SUB1, SUB11
692 flipped = all[0][1][0] != "SUB1"
693 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000694 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000695 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
696 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000697 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000698
699 # Prune the search.
700 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000701 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000702 all.append((root, dirs, files))
703 # Don't descend into SUB1.
704 if 'SUB1' in dirs:
705 # Note that this also mutates the dirs we appended to all!
706 dirs.remove('SUB1')
707 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000708 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
709 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000710
711 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000712 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000713 self.assertEqual(len(all), 4)
714 # We can't know which order SUB1 and SUB2 will appear in.
715 # Not flipped: SUB11, SUB1, SUB2, TESTFN
716 # flipped: SUB2, SUB11, SUB1, TESTFN
717 flipped = all[3][1][0] != "SUB1"
718 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000719 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000720 self.assertEqual(all[flipped], (sub11_path, [], []))
721 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000722 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000723
Brian Curtin3b4499c2010-12-28 14:31:47 +0000724 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000725 # Walk, following symlinks.
726 for root, dirs, files in os.walk(walk_path, followlinks=True):
727 if root == link_path:
728 self.assertEqual(dirs, [])
729 self.assertEqual(files, ["tmp4"])
730 break
731 else:
732 self.fail("Didn't follow symlink with followlinks=True")
733
734 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000735 # Tear everything down. This is a decent use for bottom-up on
736 # Windows, which doesn't have a recursive delete command. The
737 # (not so) subtlety is that rmdir will fail unless the dir's
738 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000739 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000740 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000741 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000742 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000743 dirname = os.path.join(root, name)
744 if not os.path.islink(dirname):
745 os.rmdir(dirname)
746 else:
747 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000748 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000749
Charles-François Natali7372b062012-02-05 15:15:38 +0100750
751@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
752class FwalkTests(WalkTests):
753 """Tests for os.fwalk()."""
754
755 def test_compare_to_walk(self):
756 # compare with walk() results
757 for topdown, followlinks in itertools.product((True, False), repeat=2):
758 args = support.TESTFN, topdown, None, followlinks
759 expected = {}
760 for root, dirs, files in os.walk(*args):
761 expected[root] = (set(dirs), set(files))
762
763 for root, dirs, files, rootfd in os.fwalk(*args):
764 self.assertIn(root, expected)
765 self.assertEqual(expected[root], (set(dirs), set(files)))
766
767 def test_dir_fd(self):
768 # check returned file descriptors
769 for topdown, followlinks in itertools.product((True, False), repeat=2):
770 args = support.TESTFN, topdown, None, followlinks
771 for root, dirs, files, rootfd in os.fwalk(*args):
772 # check that the FD is valid
773 os.fstat(rootfd)
Charles-François Natali77940902012-02-06 19:54:48 +0100774 # check that flistdir() returns consistent information
775 self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100776
777 def test_fd_leak(self):
778 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
779 # we both check that calling fwalk() a large number of times doesn't
780 # yield EMFILE, and that the minimum allocated FD hasn't changed.
781 minfd = os.dup(1)
782 os.close(minfd)
783 for i in range(256):
784 for x in os.fwalk(support.TESTFN):
785 pass
786 newfd = os.dup(1)
787 self.addCleanup(os.close, newfd)
788 self.assertEqual(newfd, minfd)
789
790 def tearDown(self):
791 # cleanup
792 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
793 for name in files:
794 os.unlinkat(rootfd, name)
795 for name in dirs:
796 st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW)
797 if stat.S_ISDIR(st.st_mode):
798 os.unlinkat(rootfd, name, os.AT_REMOVEDIR)
799 else:
800 os.unlinkat(rootfd, name)
801 os.rmdir(support.TESTFN)
802
803
Guido van Rossume7ba4952007-06-06 23:52:48 +0000804class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000805 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000806 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000807
808 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000809 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000810 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
811 os.makedirs(path) # Should work
812 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
813 os.makedirs(path)
814
815 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000816 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000817 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
818 os.makedirs(path)
819 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
820 'dir5', 'dir6')
821 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000822
Terry Reedy5a22b652010-12-02 07:05:56 +0000823 def test_exist_ok_existing_directory(self):
824 path = os.path.join(support.TESTFN, 'dir1')
825 mode = 0o777
826 old_mask = os.umask(0o022)
827 os.makedirs(path, mode)
828 self.assertRaises(OSError, os.makedirs, path, mode)
829 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
830 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
831 os.makedirs(path, mode=mode, exist_ok=True)
832 os.umask(old_mask)
833
834 def test_exist_ok_existing_regular_file(self):
835 base = support.TESTFN
836 path = os.path.join(support.TESTFN, 'dir1')
837 f = open(path, 'w')
838 f.write('abc')
839 f.close()
840 self.assertRaises(OSError, os.makedirs, path)
841 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
842 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
843 os.remove(path)
844
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000845 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000846 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000847 'dir4', 'dir5', 'dir6')
848 # If the tests failed, the bottom-most directory ('../dir6')
849 # may not have been created, so we look for the outermost directory
850 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000851 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000852 path = os.path.dirname(path)
853
854 os.removedirs(path)
855
Guido van Rossume7ba4952007-06-06 23:52:48 +0000856class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000857 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200858 with open(os.devnull, 'wb') as f:
859 f.write(b'hello')
860 f.close()
861 with open(os.devnull, 'rb') as f:
862 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000863
Guido van Rossume7ba4952007-06-06 23:52:48 +0000864class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100865 def test_urandom_length(self):
866 self.assertEqual(len(os.urandom(0)), 0)
867 self.assertEqual(len(os.urandom(1)), 1)
868 self.assertEqual(len(os.urandom(10)), 10)
869 self.assertEqual(len(os.urandom(100)), 100)
870 self.assertEqual(len(os.urandom(1000)), 1000)
871
872 def test_urandom_value(self):
873 data1 = os.urandom(16)
874 data2 = os.urandom(16)
875 self.assertNotEqual(data1, data2)
876
877 def get_urandom_subprocess(self, count):
878 code = '\n'.join((
879 'import os, sys',
880 'data = os.urandom(%s)' % count,
881 'sys.stdout.buffer.write(data)',
882 'sys.stdout.buffer.flush()'))
883 out = assert_python_ok('-c', code)
884 stdout = out[1]
885 self.assertEqual(len(stdout), 16)
886 return stdout
887
888 def test_urandom_subprocess(self):
889 data1 = self.get_urandom_subprocess(16)
890 data2 = self.get_urandom_subprocess(16)
891 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000892
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000893@contextlib.contextmanager
894def _execvpe_mockup(defpath=None):
895 """
896 Stubs out execv and execve functions when used as context manager.
897 Records exec calls. The mock execv and execve functions always raise an
898 exception as they would normally never return.
899 """
900 # A list of tuples containing (function name, first arg, args)
901 # of calls to execv or execve that have been made.
902 calls = []
903
904 def mock_execv(name, *args):
905 calls.append(('execv', name, args))
906 raise RuntimeError("execv called")
907
908 def mock_execve(name, *args):
909 calls.append(('execve', name, args))
910 raise OSError(errno.ENOTDIR, "execve called")
911
912 try:
913 orig_execv = os.execv
914 orig_execve = os.execve
915 orig_defpath = os.defpath
916 os.execv = mock_execv
917 os.execve = mock_execve
918 if defpath is not None:
919 os.defpath = defpath
920 yield calls
921 finally:
922 os.execv = orig_execv
923 os.execve = orig_execve
924 os.defpath = orig_defpath
925
Guido van Rossume7ba4952007-06-06 23:52:48 +0000926class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000927 @unittest.skipIf(USING_LINUXTHREADS,
928 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000929 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000930 self.assertRaises(OSError, os.execvpe, 'no such app-',
931 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000932
Thomas Heller6790d602007-08-30 17:15:14 +0000933 def test_execvpe_with_bad_arglist(self):
934 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
935
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000936 @unittest.skipUnless(hasattr(os, '_execvpe'),
937 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000938 def _test_internal_execvpe(self, test_type):
939 program_path = os.sep + 'absolutepath'
940 if test_type is bytes:
941 program = b'executable'
942 fullpath = os.path.join(os.fsencode(program_path), program)
943 native_fullpath = fullpath
944 arguments = [b'progname', 'arg1', 'arg2']
945 else:
946 program = 'executable'
947 arguments = ['progname', 'arg1', 'arg2']
948 fullpath = os.path.join(program_path, program)
949 if os.name != "nt":
950 native_fullpath = os.fsencode(fullpath)
951 else:
952 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000953 env = {'spam': 'beans'}
954
Victor Stinnerb745a742010-05-18 17:17:23 +0000955 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000956 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000957 self.assertRaises(RuntimeError,
958 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000959 self.assertEqual(len(calls), 1)
960 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
961
Victor Stinnerb745a742010-05-18 17:17:23 +0000962 # test os._execvpe() with a relative path:
963 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000964 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000965 self.assertRaises(OSError,
966 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000967 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000968 self.assertSequenceEqual(calls[0],
969 ('execve', native_fullpath, (arguments, env)))
970
971 # test os._execvpe() with a relative path:
972 # os.get_exec_path() reads the 'PATH' variable
973 with _execvpe_mockup() as calls:
974 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000975 if test_type is bytes:
976 env_path[b'PATH'] = program_path
977 else:
978 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000979 self.assertRaises(OSError,
980 os._execvpe, program, arguments, env=env_path)
981 self.assertEqual(len(calls), 1)
982 self.assertSequenceEqual(calls[0],
983 ('execve', native_fullpath, (arguments, env_path)))
984
985 def test_internal_execvpe_str(self):
986 self._test_internal_execvpe(str)
987 if os.name != "nt":
988 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000989
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000990
Thomas Wouters477c8d52006-05-27 19:21:47 +0000991class Win32ErrorTests(unittest.TestCase):
992 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000993 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000994
995 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000996 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000997
998 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000999 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001000
1001 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001002 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001003 try:
1004 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
1005 finally:
1006 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001007 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001008
1009 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001010 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001011
Thomas Wouters477c8d52006-05-27 19:21:47 +00001012 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +00001013 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001014
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001015class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001016 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001017 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1018 #singles.append("close")
1019 #We omit close because it doesn'r raise an exception on some platforms
1020 def get_single(f):
1021 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001022 if hasattr(os, f):
1023 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001024 return helper
1025 for f in singles:
1026 locals()["test_"+f] = get_single(f)
1027
Benjamin Peterson7522c742009-01-19 21:00:09 +00001028 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001029 try:
1030 f(support.make_bad_fd(), *args)
1031 except OSError as e:
1032 self.assertEqual(e.errno, errno.EBADF)
1033 else:
1034 self.fail("%r didn't raise a OSError with a bad file descriptor"
1035 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001036
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001037 def test_isatty(self):
1038 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001039 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001040
1041 def test_closerange(self):
1042 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001043 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +00001044 # Make sure none of the descriptors we are about to close are
1045 # currently valid (issue 6542).
1046 for i in range(10):
1047 try: os.fstat(fd+i)
1048 except OSError:
1049 pass
1050 else:
1051 break
1052 if i < 2:
1053 raise unittest.SkipTest(
1054 "Unable to acquire a range of invalid file descriptors")
1055 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001056
1057 def test_dup2(self):
1058 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001059 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001060
1061 def test_fchmod(self):
1062 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001063 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001064
1065 def test_fchown(self):
1066 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001067 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001068
1069 def test_fpathconf(self):
1070 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001071 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001072
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001073 def test_ftruncate(self):
1074 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001075 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001076
1077 def test_lseek(self):
1078 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001079 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001080
1081 def test_read(self):
1082 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001083 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001084
1085 def test_tcsetpgrpt(self):
1086 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001087 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001088
1089 def test_write(self):
1090 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001091 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001092
Brian Curtin1b9df392010-11-24 20:24:31 +00001093
1094class LinkTests(unittest.TestCase):
1095 def setUp(self):
1096 self.file1 = support.TESTFN
1097 self.file2 = os.path.join(support.TESTFN + "2")
1098
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001099 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001100 for file in (self.file1, self.file2):
1101 if os.path.exists(file):
1102 os.unlink(file)
1103
Brian Curtin1b9df392010-11-24 20:24:31 +00001104 def _test_link(self, file1, file2):
1105 with open(file1, "w") as f1:
1106 f1.write("test")
1107
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001108 with warnings.catch_warnings():
1109 warnings.simplefilter("ignore", DeprecationWarning)
1110 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001111 with open(file1, "r") as f1, open(file2, "r") as f2:
1112 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1113
1114 def test_link(self):
1115 self._test_link(self.file1, self.file2)
1116
1117 def test_link_bytes(self):
1118 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1119 bytes(self.file2, sys.getfilesystemencoding()))
1120
Brian Curtinf498b752010-11-30 15:54:04 +00001121 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001122 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001123 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001124 except UnicodeError:
1125 raise unittest.SkipTest("Unable to encode for this platform.")
1126
Brian Curtinf498b752010-11-30 15:54:04 +00001127 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001128 self.file2 = self.file1 + "2"
1129 self._test_link(self.file1, self.file2)
1130
Thomas Wouters477c8d52006-05-27 19:21:47 +00001131if sys.platform != 'win32':
1132 class Win32ErrorTests(unittest.TestCase):
1133 pass
1134
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001135 class PosixUidGidTests(unittest.TestCase):
1136 if hasattr(os, 'setuid'):
1137 def test_setuid(self):
1138 if os.getuid() != 0:
1139 self.assertRaises(os.error, os.setuid, 0)
1140 self.assertRaises(OverflowError, os.setuid, 1<<32)
1141
1142 if hasattr(os, 'setgid'):
1143 def test_setgid(self):
1144 if os.getuid() != 0:
1145 self.assertRaises(os.error, os.setgid, 0)
1146 self.assertRaises(OverflowError, os.setgid, 1<<32)
1147
1148 if hasattr(os, 'seteuid'):
1149 def test_seteuid(self):
1150 if os.getuid() != 0:
1151 self.assertRaises(os.error, os.seteuid, 0)
1152 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1153
1154 if hasattr(os, 'setegid'):
1155 def test_setegid(self):
1156 if os.getuid() != 0:
1157 self.assertRaises(os.error, os.setegid, 0)
1158 self.assertRaises(OverflowError, os.setegid, 1<<32)
1159
1160 if hasattr(os, 'setreuid'):
1161 def test_setreuid(self):
1162 if os.getuid() != 0:
1163 self.assertRaises(os.error, os.setreuid, 0, 0)
1164 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1165 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001166
1167 def test_setreuid_neg1(self):
1168 # Needs to accept -1. We run this in a subprocess to avoid
1169 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001170 subprocess.check_call([
1171 sys.executable, '-c',
1172 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001173
1174 if hasattr(os, 'setregid'):
1175 def test_setregid(self):
1176 if os.getuid() != 0:
1177 self.assertRaises(os.error, os.setregid, 0, 0)
1178 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1179 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001180
1181 def test_setregid_neg1(self):
1182 # Needs to accept -1. We run this in a subprocess to avoid
1183 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001184 subprocess.check_call([
1185 sys.executable, '-c',
1186 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001187
1188 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001189 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001190 if support.TESTFN_UNENCODABLE:
1191 self.dir = support.TESTFN_UNENCODABLE
1192 else:
1193 self.dir = support.TESTFN
1194 self.bdir = os.fsencode(self.dir)
1195
1196 bytesfn = []
1197 def add_filename(fn):
1198 try:
1199 fn = os.fsencode(fn)
1200 except UnicodeEncodeError:
1201 return
1202 bytesfn.append(fn)
1203 add_filename(support.TESTFN_UNICODE)
1204 if support.TESTFN_UNENCODABLE:
1205 add_filename(support.TESTFN_UNENCODABLE)
1206 if not bytesfn:
1207 self.skipTest("couldn't create any non-ascii filename")
1208
1209 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001210 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001211 try:
1212 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001213 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001214 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001215 if fn in self.unicodefn:
1216 raise ValueError("duplicate filename")
1217 self.unicodefn.add(fn)
1218 except:
1219 shutil.rmtree(self.dir)
1220 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001221
1222 def tearDown(self):
1223 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001224
1225 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001226 expected = self.unicodefn
1227 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001228 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001229
1230 def test_open(self):
1231 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001232 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001233 f.close()
1234
1235 def test_stat(self):
1236 for fn in self.unicodefn:
1237 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001238else:
1239 class PosixUidGidTests(unittest.TestCase):
1240 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001241 class Pep383Tests(unittest.TestCase):
1242 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001243
Brian Curtineb24d742010-04-12 17:16:38 +00001244@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1245class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001246 def _kill(self, sig):
1247 # Start sys.executable as a subprocess and communicate from the
1248 # subprocess to the parent that the interpreter is ready. When it
1249 # becomes ready, send *sig* via os.kill to the subprocess and check
1250 # that the return code is equal to *sig*.
1251 import ctypes
1252 from ctypes import wintypes
1253 import msvcrt
1254
1255 # Since we can't access the contents of the process' stdout until the
1256 # process has exited, use PeekNamedPipe to see what's inside stdout
1257 # without waiting. This is done so we can tell that the interpreter
1258 # is started and running at a point where it could handle a signal.
1259 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1260 PeekNamedPipe.restype = wintypes.BOOL
1261 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1262 ctypes.POINTER(ctypes.c_char), # stdout buf
1263 wintypes.DWORD, # Buffer size
1264 ctypes.POINTER(wintypes.DWORD), # bytes read
1265 ctypes.POINTER(wintypes.DWORD), # bytes avail
1266 ctypes.POINTER(wintypes.DWORD)) # bytes left
1267 msg = "running"
1268 proc = subprocess.Popen([sys.executable, "-c",
1269 "import sys;"
1270 "sys.stdout.write('{}');"
1271 "sys.stdout.flush();"
1272 "input()".format(msg)],
1273 stdout=subprocess.PIPE,
1274 stderr=subprocess.PIPE,
1275 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001276 self.addCleanup(proc.stdout.close)
1277 self.addCleanup(proc.stderr.close)
1278 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001279
1280 count, max = 0, 100
1281 while count < max and proc.poll() is None:
1282 # Create a string buffer to store the result of stdout from the pipe
1283 buf = ctypes.create_string_buffer(len(msg))
1284 # Obtain the text currently in proc.stdout
1285 # Bytes read/avail/left are left as NULL and unused
1286 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1287 buf, ctypes.sizeof(buf), None, None, None)
1288 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1289 if buf.value:
1290 self.assertEqual(msg, buf.value.decode())
1291 break
1292 time.sleep(0.1)
1293 count += 1
1294 else:
1295 self.fail("Did not receive communication from the subprocess")
1296
Brian Curtineb24d742010-04-12 17:16:38 +00001297 os.kill(proc.pid, sig)
1298 self.assertEqual(proc.wait(), sig)
1299
1300 def test_kill_sigterm(self):
1301 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001302 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001303
1304 def test_kill_int(self):
1305 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001306 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001307
1308 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001309 tagname = "test_os_%s" % uuid.uuid1()
1310 m = mmap.mmap(-1, 1, tagname)
1311 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001312 # Run a script which has console control handling enabled.
1313 proc = subprocess.Popen([sys.executable,
1314 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001315 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001316 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1317 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001318 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001319 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001320 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001321 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001322 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001323 count += 1
1324 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001325 # Forcefully kill the process if we weren't able to signal it.
1326 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001327 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001328 os.kill(proc.pid, event)
1329 # proc.send_signal(event) could also be done here.
1330 # Allow time for the signal to be passed and the process to exit.
1331 time.sleep(0.5)
1332 if not proc.poll():
1333 # Forcefully kill the process if we weren't able to signal it.
1334 os.kill(proc.pid, signal.SIGINT)
1335 self.fail("subprocess did not stop on {}".format(name))
1336
1337 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1338 def test_CTRL_C_EVENT(self):
1339 from ctypes import wintypes
1340 import ctypes
1341
1342 # Make a NULL value by creating a pointer with no argument.
1343 NULL = ctypes.POINTER(ctypes.c_int)()
1344 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1345 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1346 wintypes.BOOL)
1347 SetConsoleCtrlHandler.restype = wintypes.BOOL
1348
1349 # Calling this with NULL and FALSE causes the calling process to
1350 # handle CTRL+C, rather than ignore it. This property is inherited
1351 # by subprocesses.
1352 SetConsoleCtrlHandler(NULL, 0)
1353
1354 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1355
1356 def test_CTRL_BREAK_EVENT(self):
1357 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1358
1359
Brian Curtind40e6f72010-07-08 21:39:08 +00001360@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001361@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001362class Win32SymlinkTests(unittest.TestCase):
1363 filelink = 'filelinktest'
1364 filelink_target = os.path.abspath(__file__)
1365 dirlink = 'dirlinktest'
1366 dirlink_target = os.path.dirname(filelink_target)
1367 missing_link = 'missing link'
1368
1369 def setUp(self):
1370 assert os.path.exists(self.dirlink_target)
1371 assert os.path.exists(self.filelink_target)
1372 assert not os.path.exists(self.dirlink)
1373 assert not os.path.exists(self.filelink)
1374 assert not os.path.exists(self.missing_link)
1375
1376 def tearDown(self):
1377 if os.path.exists(self.filelink):
1378 os.remove(self.filelink)
1379 if os.path.exists(self.dirlink):
1380 os.rmdir(self.dirlink)
1381 if os.path.lexists(self.missing_link):
1382 os.remove(self.missing_link)
1383
1384 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001385 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001386 self.assertTrue(os.path.exists(self.dirlink))
1387 self.assertTrue(os.path.isdir(self.dirlink))
1388 self.assertTrue(os.path.islink(self.dirlink))
1389 self.check_stat(self.dirlink, self.dirlink_target)
1390
1391 def test_file_link(self):
1392 os.symlink(self.filelink_target, self.filelink)
1393 self.assertTrue(os.path.exists(self.filelink))
1394 self.assertTrue(os.path.isfile(self.filelink))
1395 self.assertTrue(os.path.islink(self.filelink))
1396 self.check_stat(self.filelink, self.filelink_target)
1397
1398 def _create_missing_dir_link(self):
1399 'Create a "directory" link to a non-existent target'
1400 linkname = self.missing_link
1401 if os.path.lexists(linkname):
1402 os.remove(linkname)
1403 target = r'c:\\target does not exist.29r3c740'
1404 assert not os.path.exists(target)
1405 target_is_dir = True
1406 os.symlink(target, linkname, target_is_dir)
1407
1408 def test_remove_directory_link_to_missing_target(self):
1409 self._create_missing_dir_link()
1410 # For compatibility with Unix, os.remove will check the
1411 # directory status and call RemoveDirectory if the symlink
1412 # was created with target_is_dir==True.
1413 os.remove(self.missing_link)
1414
1415 @unittest.skip("currently fails; consider for improvement")
1416 def test_isdir_on_directory_link_to_missing_target(self):
1417 self._create_missing_dir_link()
1418 # consider having isdir return true for directory links
1419 self.assertTrue(os.path.isdir(self.missing_link))
1420
1421 @unittest.skip("currently fails; consider for improvement")
1422 def test_rmdir_on_directory_link_to_missing_target(self):
1423 self._create_missing_dir_link()
1424 # consider allowing rmdir to remove directory links
1425 os.rmdir(self.missing_link)
1426
1427 def check_stat(self, link, target):
1428 self.assertEqual(os.stat(link), os.stat(target))
1429 self.assertNotEqual(os.lstat(link), os.stat(link))
1430
Brian Curtind25aef52011-06-13 15:16:04 -05001431 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001432 with warnings.catch_warnings():
1433 warnings.simplefilter("ignore", DeprecationWarning)
1434 self.assertEqual(os.stat(bytes_link), os.stat(target))
1435 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001436
1437 def test_12084(self):
1438 level1 = os.path.abspath(support.TESTFN)
1439 level2 = os.path.join(level1, "level2")
1440 level3 = os.path.join(level2, "level3")
1441 try:
1442 os.mkdir(level1)
1443 os.mkdir(level2)
1444 os.mkdir(level3)
1445
1446 file1 = os.path.abspath(os.path.join(level1, "file1"))
1447
1448 with open(file1, "w") as f:
1449 f.write("file1")
1450
1451 orig_dir = os.getcwd()
1452 try:
1453 os.chdir(level2)
1454 link = os.path.join(level2, "link")
1455 os.symlink(os.path.relpath(file1), "link")
1456 self.assertIn("link", os.listdir(os.getcwd()))
1457
1458 # Check os.stat calls from the same dir as the link
1459 self.assertEqual(os.stat(file1), os.stat("link"))
1460
1461 # Check os.stat calls from a dir below the link
1462 os.chdir(level1)
1463 self.assertEqual(os.stat(file1),
1464 os.stat(os.path.relpath(link)))
1465
1466 # Check os.stat calls from a dir above the link
1467 os.chdir(level3)
1468 self.assertEqual(os.stat(file1),
1469 os.stat(os.path.relpath(link)))
1470 finally:
1471 os.chdir(orig_dir)
1472 except OSError as err:
1473 self.fail(err)
1474 finally:
1475 os.remove(file1)
1476 shutil.rmtree(level1)
1477
Brian Curtind40e6f72010-07-08 21:39:08 +00001478
Victor Stinnere8d51452010-08-19 01:05:19 +00001479class FSEncodingTests(unittest.TestCase):
1480 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001481 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1482 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001483
Victor Stinnere8d51452010-08-19 01:05:19 +00001484 def test_identity(self):
1485 # assert fsdecode(fsencode(x)) == x
1486 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1487 try:
1488 bytesfn = os.fsencode(fn)
1489 except UnicodeEncodeError:
1490 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001491 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001492
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001493
Brett Cannonefb00c02012-02-29 18:31:31 -05001494
1495class DeviceEncodingTests(unittest.TestCase):
1496
1497 def test_bad_fd(self):
1498 # Return None when an fd doesn't actually exist.
1499 self.assertIsNone(os.device_encoding(123456))
1500
Philip Jenveye308b7c2012-02-29 16:16:15 -08001501 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1502 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001503 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001504 def test_device_encoding(self):
1505 encoding = os.device_encoding(0)
1506 self.assertIsNotNone(encoding)
1507 self.assertTrue(codecs.lookup(encoding))
1508
1509
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001510class PidTests(unittest.TestCase):
1511 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1512 def test_getppid(self):
1513 p = subprocess.Popen([sys.executable, '-c',
1514 'import os; print(os.getppid())'],
1515 stdout=subprocess.PIPE)
1516 stdout, _ = p.communicate()
1517 # We are the parent of our subprocess
1518 self.assertEqual(int(stdout), os.getpid())
1519
1520
Brian Curtin0151b8e2010-09-24 13:43:43 +00001521# The introduction of this TestCase caused at least two different errors on
1522# *nix buildbots. Temporarily skip this to let the buildbots move along.
1523@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001524@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1525class LoginTests(unittest.TestCase):
1526 def test_getlogin(self):
1527 user_name = os.getlogin()
1528 self.assertNotEqual(len(user_name), 0)
1529
1530
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001531@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1532 "needs os.getpriority and os.setpriority")
1533class ProgramPriorityTests(unittest.TestCase):
1534 """Tests for os.getpriority() and os.setpriority()."""
1535
1536 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001537
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001538 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1539 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1540 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001541 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1542 if base >= 19 and new_prio <= 19:
1543 raise unittest.SkipTest(
1544 "unable to reliably test setpriority at current nice level of %s" % base)
1545 else:
1546 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001547 finally:
1548 try:
1549 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1550 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001551 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001552 raise
1553
1554
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001555if threading is not None:
1556 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001557
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001558 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001559
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001560 def __init__(self, conn):
1561 asynchat.async_chat.__init__(self, conn)
1562 self.in_buffer = []
1563 self.closed = False
1564 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001565
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001566 def handle_read(self):
1567 data = self.recv(4096)
1568 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001569
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001570 def get_data(self):
1571 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001572
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001573 def handle_close(self):
1574 self.close()
1575 self.closed = True
1576
1577 def handle_error(self):
1578 raise
1579
1580 def __init__(self, address):
1581 threading.Thread.__init__(self)
1582 asyncore.dispatcher.__init__(self)
1583 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1584 self.bind(address)
1585 self.listen(5)
1586 self.host, self.port = self.socket.getsockname()[:2]
1587 self.handler_instance = None
1588 self._active = False
1589 self._active_lock = threading.Lock()
1590
1591 # --- public API
1592
1593 @property
1594 def running(self):
1595 return self._active
1596
1597 def start(self):
1598 assert not self.running
1599 self.__flag = threading.Event()
1600 threading.Thread.start(self)
1601 self.__flag.wait()
1602
1603 def stop(self):
1604 assert self.running
1605 self._active = False
1606 self.join()
1607
1608 def wait(self):
1609 # wait for handler connection to be closed, then stop the server
1610 while not getattr(self.handler_instance, "closed", False):
1611 time.sleep(0.001)
1612 self.stop()
1613
1614 # --- internals
1615
1616 def run(self):
1617 self._active = True
1618 self.__flag.set()
1619 while self._active and asyncore.socket_map:
1620 self._active_lock.acquire()
1621 asyncore.loop(timeout=0.001, count=1)
1622 self._active_lock.release()
1623 asyncore.close_all()
1624
1625 def handle_accept(self):
1626 conn, addr = self.accept()
1627 self.handler_instance = self.Handler(conn)
1628
1629 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001630 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001631 handle_read = handle_connect
1632
1633 def writable(self):
1634 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001635
1636 def handle_error(self):
1637 raise
1638
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001639
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001640@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001641@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1642class TestSendfile(unittest.TestCase):
1643
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001644 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001645 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001646 not sys.platform.startswith("solaris") and \
1647 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001648
1649 @classmethod
1650 def setUpClass(cls):
1651 with open(support.TESTFN, "wb") as f:
1652 f.write(cls.DATA)
1653
1654 @classmethod
1655 def tearDownClass(cls):
1656 support.unlink(support.TESTFN)
1657
1658 def setUp(self):
1659 self.server = SendfileTestServer((support.HOST, 0))
1660 self.server.start()
1661 self.client = socket.socket()
1662 self.client.connect((self.server.host, self.server.port))
1663 self.client.settimeout(1)
1664 # synchronize by waiting for "220 ready" response
1665 self.client.recv(1024)
1666 self.sockno = self.client.fileno()
1667 self.file = open(support.TESTFN, 'rb')
1668 self.fileno = self.file.fileno()
1669
1670 def tearDown(self):
1671 self.file.close()
1672 self.client.close()
1673 if self.server.running:
1674 self.server.stop()
1675
1676 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1677 """A higher level wrapper representing how an application is
1678 supposed to use sendfile().
1679 """
1680 while 1:
1681 try:
1682 if self.SUPPORT_HEADERS_TRAILERS:
1683 return os.sendfile(sock, file, offset, nbytes, headers,
1684 trailers)
1685 else:
1686 return os.sendfile(sock, file, offset, nbytes)
1687 except OSError as err:
1688 if err.errno == errno.ECONNRESET:
1689 # disconnected
1690 raise
1691 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1692 # we have to retry send data
1693 continue
1694 else:
1695 raise
1696
1697 def test_send_whole_file(self):
1698 # normal send
1699 total_sent = 0
1700 offset = 0
1701 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001702 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001703 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1704 if sent == 0:
1705 break
1706 offset += sent
1707 total_sent += sent
1708 self.assertTrue(sent <= nbytes)
1709 self.assertEqual(offset, total_sent)
1710
1711 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001712 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001713 self.client.close()
1714 self.server.wait()
1715 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001716 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001717 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001718
1719 def test_send_at_certain_offset(self):
1720 # start sending a file at a certain offset
1721 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001722 offset = len(self.DATA) // 2
1723 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001724 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001725 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001726 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1727 if sent == 0:
1728 break
1729 offset += sent
1730 total_sent += sent
1731 self.assertTrue(sent <= nbytes)
1732
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001733 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001734 self.client.close()
1735 self.server.wait()
1736 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001737 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001738 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001739 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001740 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001741
1742 def test_offset_overflow(self):
1743 # specify an offset > file size
1744 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001745 try:
1746 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1747 except OSError as e:
1748 # Solaris can raise EINVAL if offset >= file length, ignore.
1749 if e.errno != errno.EINVAL:
1750 raise
1751 else:
1752 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001753 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001754 self.client.close()
1755 self.server.wait()
1756 data = self.server.handler_instance.get_data()
1757 self.assertEqual(data, b'')
1758
1759 def test_invalid_offset(self):
1760 with self.assertRaises(OSError) as cm:
1761 os.sendfile(self.sockno, self.fileno, -1, 4096)
1762 self.assertEqual(cm.exception.errno, errno.EINVAL)
1763
1764 # --- headers / trailers tests
1765
1766 if SUPPORT_HEADERS_TRAILERS:
1767
1768 def test_headers(self):
1769 total_sent = 0
1770 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1771 headers=[b"x" * 512])
1772 total_sent += sent
1773 offset = 4096
1774 nbytes = 4096
1775 while 1:
1776 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1777 offset, nbytes)
1778 if sent == 0:
1779 break
1780 total_sent += sent
1781 offset += sent
1782
1783 expected_data = b"x" * 512 + self.DATA
1784 self.assertEqual(total_sent, len(expected_data))
1785 self.client.close()
1786 self.server.wait()
1787 data = self.server.handler_instance.get_data()
1788 self.assertEqual(hash(data), hash(expected_data))
1789
1790 def test_trailers(self):
1791 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001792 with open(TESTFN2, 'wb') as f:
1793 f.write(b"abcde")
1794 with open(TESTFN2, 'rb')as f:
1795 self.addCleanup(os.remove, TESTFN2)
1796 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1797 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001798 self.client.close()
1799 self.server.wait()
1800 data = self.server.handler_instance.get_data()
1801 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001802
1803 if hasattr(os, "SF_NODISKIO"):
1804 def test_flags(self):
1805 try:
1806 os.sendfile(self.sockno, self.fileno, 0, 4096,
1807 flags=os.SF_NODISKIO)
1808 except OSError as err:
1809 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1810 raise
1811
1812
Antoine Pitrou424246f2012-05-12 19:02:01 +02001813@support.skip_unless_xattr
Benjamin Peterson799bd802011-08-31 22:15:17 -04001814class ExtendedAttributeTests(unittest.TestCase):
1815
1816 def tearDown(self):
1817 support.unlink(support.TESTFN)
1818
1819 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1820 fn = support.TESTFN
1821 open(fn, "wb").close()
1822 with self.assertRaises(OSError) as cm:
1823 getxattr(fn, s("user.test"))
1824 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001825 init_xattr = listxattr(fn)
1826 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001827 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001828 xattr = set(init_xattr)
1829 xattr.add("user.test")
1830 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001831 self.assertEqual(getxattr(fn, b"user.test"), b"")
1832 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1833 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1834 with self.assertRaises(OSError) as cm:
1835 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1836 self.assertEqual(cm.exception.errno, errno.EEXIST)
1837 with self.assertRaises(OSError) as cm:
1838 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1839 self.assertEqual(cm.exception.errno, errno.ENODATA)
1840 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001841 xattr.add("user.test2")
1842 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001843 removexattr(fn, s("user.test"))
1844 with self.assertRaises(OSError) as cm:
1845 getxattr(fn, s("user.test"))
1846 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001847 xattr.remove("user.test")
1848 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001849 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1850 setxattr(fn, s("user.test"), b"a"*1024)
1851 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1852 removexattr(fn, s("user.test"))
1853 many = sorted("user.test{}".format(i) for i in range(100))
1854 for thing in many:
1855 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001856 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001857
1858 def _check_xattrs(self, *args):
1859 def make_bytes(s):
1860 return bytes(s, "ascii")
1861 self._check_xattrs_str(str, *args)
1862 support.unlink(support.TESTFN)
1863 self._check_xattrs_str(make_bytes, *args)
1864
1865 def test_simple(self):
1866 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1867 os.listxattr)
1868
1869 def test_lpath(self):
1870 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1871 os.llistxattr)
1872
1873 def test_fds(self):
1874 def getxattr(path, *args):
1875 with open(path, "rb") as fp:
1876 return os.fgetxattr(fp.fileno(), *args)
1877 def setxattr(path, *args):
1878 with open(path, "wb") as fp:
1879 os.fsetxattr(fp.fileno(), *args)
1880 def removexattr(path, *args):
1881 with open(path, "wb") as fp:
1882 os.fremovexattr(fp.fileno(), *args)
1883 def listxattr(path, *args):
1884 with open(path, "rb") as fp:
1885 return os.flistxattr(fp.fileno(), *args)
1886 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1887
1888
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001889@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1890class Win32DeprecatedBytesAPI(unittest.TestCase):
1891 def test_deprecated(self):
1892 import nt
1893 filename = os.fsencode(support.TESTFN)
1894 with warnings.catch_warnings():
1895 warnings.simplefilter("error", DeprecationWarning)
1896 for func, *args in (
1897 (nt._getfullpathname, filename),
1898 (nt._isdir, filename),
1899 (os.access, filename, os.R_OK),
1900 (os.chdir, filename),
1901 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001902 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001903 (os.link, filename, filename),
1904 (os.listdir, filename),
1905 (os.lstat, filename),
1906 (os.mkdir, filename),
1907 (os.open, filename, os.O_RDONLY),
1908 (os.rename, filename, filename),
1909 (os.rmdir, filename),
1910 (os.startfile, filename),
1911 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001912 (os.unlink, filename),
1913 (os.utime, filename),
1914 ):
1915 self.assertRaises(DeprecationWarning, func, *args)
1916
Victor Stinner28216442011-11-16 00:34:44 +01001917 @support.skip_unless_symlink
1918 def test_symlink(self):
1919 filename = os.fsencode(support.TESTFN)
1920 with warnings.catch_warnings():
1921 warnings.simplefilter("error", DeprecationWarning)
1922 self.assertRaises(DeprecationWarning,
1923 os.symlink, filename, filename)
1924
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001925
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001926@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
1927class TermsizeTests(unittest.TestCase):
1928 def test_does_not_crash(self):
1929 """Check if get_terminal_size() returns a meaningful value.
1930
1931 There's no easy portable way to actually check the size of the
1932 terminal, so let's check if it returns something sensible instead.
1933 """
1934 try:
1935 size = os.get_terminal_size()
1936 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001937 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001938 # Under win32 a generic OSError can be thrown if the
1939 # handle cannot be retrieved
1940 self.skipTest("failed to query terminal size")
1941 raise
1942
Antoine Pitroucfade362012-02-08 23:48:59 +01001943 self.assertGreaterEqual(size.columns, 0)
1944 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001945
1946 def test_stty_match(self):
1947 """Check if stty returns the same results
1948
1949 stty actually tests stdin, so get_terminal_size is invoked on
1950 stdin explicitly. If stty succeeded, then get_terminal_size()
1951 should work too.
1952 """
1953 try:
1954 size = subprocess.check_output(['stty', 'size']).decode().split()
1955 except (FileNotFoundError, subprocess.CalledProcessError):
1956 self.skipTest("stty invocation failed")
1957 expected = (int(size[1]), int(size[0])) # reversed order
1958
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001959 try:
1960 actual = os.get_terminal_size(sys.__stdin__.fileno())
1961 except OSError as e:
1962 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
1963 # Under win32 a generic OSError can be thrown if the
1964 # handle cannot be retrieved
1965 self.skipTest("failed to query terminal size")
1966 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001967 self.assertEqual(expected, actual)
1968
1969
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001970@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001971def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001972 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001973 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001974 StatAttributeTests,
1975 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001976 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01001977 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001978 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001979 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001980 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001981 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001982 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001983 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001984 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001985 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001986 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001987 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001988 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05001989 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001990 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001991 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001992 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001993 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001994 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001995 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001996 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001997 TermsizeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001998 )
Fred Drake2e2be372001-09-20 21:33:42 +00001999
2000if __name__ == "__main__":
2001 test_main()