blob: 3ee5a1e08a3b1a8e444f6de6c02e6cd140b37b78 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000027try:
28 import threading
29except ImportError:
30 threading = None
Georg Brandl2daf6ae2012-02-20 19:54:16 +010031from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000032
Victor Stinner1aa54a42012-02-08 04:09:37 +010033os.stat_float_times(True)
34st = os.stat(__file__)
35stat_supports_subsecond = (
36 # check if float and int timestamps are different
37 (st.st_atime != st[7])
38 or (st.st_mtime != st[8])
39 or (st.st_ctime != st[9]))
40
Mark Dickinson7cf03892010-04-16 13:45:35 +000041# Detect whether we're on a Linux system that uses the (now outdated
42# and unmaintained) linuxthreads threading library. There's an issue
43# when combining linuxthreads with a failed execv call: see
44# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020045if hasattr(sys, 'thread_info') and sys.thread_info.version:
46 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
47else:
48 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000049
Thomas Wouters0e3f5912006-08-11 14:57:12 +000050# Tests creating TESTFN
51class FileTests(unittest.TestCase):
52 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000053 if os.path.exists(support.TESTFN):
54 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000055 tearDown = setUp
56
57 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000058 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000059 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000060 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000061
Christian Heimesfdab48e2008-01-20 09:06:41 +000062 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
64 # We must allocate two consecutive file descriptors, otherwise
65 # it will mess up other file descriptors (perhaps even the three
66 # standard ones).
67 second = os.dup(first)
68 try:
69 retries = 0
70 while second != first + 1:
71 os.close(first)
72 retries += 1
73 if retries > 10:
74 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000075 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000076 first, second = second, os.dup(second)
77 finally:
78 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000079 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000080 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000081 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000082
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000083 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000084 def test_rename(self):
85 path = support.TESTFN
86 old = sys.getrefcount(path)
87 self.assertRaises(TypeError, os.rename, path, 0)
88 new = sys.getrefcount(path)
89 self.assertEqual(old, new)
90
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000091 def test_read(self):
92 with open(support.TESTFN, "w+b") as fobj:
93 fobj.write(b"spam")
94 fobj.flush()
95 fd = fobj.fileno()
96 os.lseek(fd, 0, 0)
97 s = os.read(fd, 4)
98 self.assertEqual(type(s), bytes)
99 self.assertEqual(s, b"spam")
100
101 def test_write(self):
102 # os.write() accepts bytes- and buffer-like objects but not strings
103 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
104 self.assertRaises(TypeError, os.write, fd, "beans")
105 os.write(fd, b"bacon\n")
106 os.write(fd, bytearray(b"eggs\n"))
107 os.write(fd, memoryview(b"spam\n"))
108 os.close(fd)
109 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000110 self.assertEqual(fobj.read().splitlines(),
111 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000112
Victor Stinnere0daff12011-03-20 23:36:35 +0100113 def write_windows_console(self, *args):
114 retcode = subprocess.call(args,
115 # use a new console to not flood the test output
116 creationflags=subprocess.CREATE_NEW_CONSOLE,
117 # use a shell to hide the console window (SW_HIDE)
118 shell=True)
119 self.assertEqual(retcode, 0)
120
121 @unittest.skipUnless(sys.platform == 'win32',
122 'test specific to the Windows console')
123 def test_write_windows_console(self):
124 # Issue #11395: the Windows console returns an error (12: not enough
125 # space error) on writing into stdout if stdout mode is binary and the
126 # length is greater than 66,000 bytes (or less, depending on heap
127 # usage).
128 code = "print('x' * 100000)"
129 self.write_windows_console(sys.executable, "-c", code)
130 self.write_windows_console(sys.executable, "-u", "-c", code)
131
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000132 def fdopen_helper(self, *args):
133 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200134 f = os.fdopen(fd, *args)
135 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000136
137 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200138 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
139 os.close(fd)
140
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000141 self.fdopen_helper()
142 self.fdopen_helper('r')
143 self.fdopen_helper('r', 100)
144
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100145 def test_replace(self):
146 TESTFN2 = support.TESTFN + ".2"
147 with open(support.TESTFN, 'w') as f:
148 f.write("1")
149 with open(TESTFN2, 'w') as f:
150 f.write("2")
151 self.addCleanup(os.unlink, TESTFN2)
152 os.replace(support.TESTFN, TESTFN2)
153 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
154 with open(TESTFN2, 'r') as f:
155 self.assertEqual(f.read(), "1")
156
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200157
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000158# Test attributes on return values from os.*stat* family.
159class StatAttributeTests(unittest.TestCase):
160 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000161 os.mkdir(support.TESTFN)
162 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000163 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000164 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000166
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 def tearDown(self):
168 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000169 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000170
Antoine Pitrou38425292010-09-21 18:19:07 +0000171 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000172 if not hasattr(os, "stat"):
173 return
174
Antoine Pitrou38425292010-09-21 18:19:07 +0000175 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000176
177 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000178 self.assertEqual(result[stat.ST_SIZE], 3)
179 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000180
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 # Make sure all the attributes are there
182 members = dir(result)
183 for name in dir(stat):
184 if name[:3] == 'ST_':
185 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000186 if name.endswith("TIME"):
187 def trunc(x): return int(x)
188 else:
189 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000190 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000191 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000192 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000193
Larry Hastings6fe20b32012-04-19 15:07:49 -0700194 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700195 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700196 for name in 'st_atime st_mtime st_ctime'.split():
197 floaty = int(getattr(result, name) * 100000)
198 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700199 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700200
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000201 try:
202 result[200]
203 self.fail("No exception thrown")
204 except IndexError:
205 pass
206
207 # Make sure that assignment fails
208 try:
209 result.st_mode = 1
210 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000211 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000212 pass
213
214 try:
215 result.st_rdev = 1
216 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000217 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000218 pass
219
220 try:
221 result.parrot = 1
222 self.fail("No exception thrown")
223 except AttributeError:
224 pass
225
226 # Use the stat_result constructor with a too-short tuple.
227 try:
228 result2 = os.stat_result((10,))
229 self.fail("No exception thrown")
230 except TypeError:
231 pass
232
Ezio Melotti42da6632011-03-15 05:18:48 +0200233 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000234 try:
235 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
236 except TypeError:
237 pass
238
Antoine Pitrou38425292010-09-21 18:19:07 +0000239 def test_stat_attributes(self):
240 self.check_stat_attributes(self.fname)
241
242 def test_stat_attributes_bytes(self):
243 try:
244 fname = self.fname.encode(sys.getfilesystemencoding())
245 except UnicodeEncodeError:
246 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100247 with warnings.catch_warnings():
248 warnings.simplefilter("ignore", DeprecationWarning)
249 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000250
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000251 def test_statvfs_attributes(self):
252 if not hasattr(os, "statvfs"):
253 return
254
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000255 try:
256 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000257 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000258 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000259 if e.errno == errno.ENOSYS:
260 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261
262 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000263 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000265 # Make sure all the attributes are there.
266 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
267 'ffree', 'favail', 'flag', 'namemax')
268 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270
271 # Make sure that assignment really fails
272 try:
273 result.f_bfree = 1
274 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000275 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276 pass
277
278 try:
279 result.parrot = 1
280 self.fail("No exception thrown")
281 except AttributeError:
282 pass
283
284 # Use the constructor with a too-short tuple.
285 try:
286 result2 = os.statvfs_result((10,))
287 self.fail("No exception thrown")
288 except TypeError:
289 pass
290
Ezio Melotti42da6632011-03-15 05:18:48 +0200291 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 try:
293 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
294 except TypeError:
295 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000296
Thomas Wouters89f507f2006-12-13 04:49:30 +0000297 def test_utime_dir(self):
298 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000299 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000300 # round to int, because some systems may support sub-second
301 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000302 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
303 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000304 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000305
Larry Hastings76ad59b2012-05-03 00:30:07 -0700306 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600307 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600308 # second argument. Check that the previous methods of passing
309 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700310 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600311 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700312 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
313 # Setting the time to the time you just read, then reading again,
314 # should always return exactly the same times.
315 st1 = os.stat(filename)
316 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
317 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600318 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700319 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600320 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700321 # Set to the current time in the new way
322 os.utime(filename)
323 st3 = os.stat(filename)
324 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
325
326 def test_utime(self):
327 def utime(file, times):
328 return os.utime(file, times)
329 self._test_utime(self.fname, getattr, utime, 10)
330 self._test_utime(support.TESTFN, getattr, utime, 10)
331
332
333 def _test_utime_ns(self, set_times_ns, test_dir=True):
334 def getattr_ns(o, attr):
335 return getattr(o, attr + "_ns")
336 ten_s = 10 * 1000 * 1000 * 1000
337 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
338 if test_dir:
339 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
340
341 def test_utime_ns(self):
342 def utime_ns(file, times):
343 return os.utime(file, ns=times)
344 self._test_utime_ns(utime_ns)
345
346 requires_lutimes = unittest.skipUnless(hasattr(os, 'lutimes'),
347 "os.lutimes required for this test.")
348 requires_futimes = unittest.skipUnless(hasattr(os, 'futimes'),
349 "os.futimes required for this test.")
350
351 @requires_lutimes
352 def test_lutimes_ns(self):
353 def lutimes_ns(file, times):
354 return os.lutimes(file, ns=times)
355 self._test_utime_ns(lutimes_ns)
356
357 @requires_futimes
358 def test_futimes_ns(self):
359 def futimes_ns(file, times):
360 with open(file, "wb") as f:
361 os.futimes(f.fileno(), ns=times)
362 self._test_utime_ns(futimes_ns, test_dir=False)
363
364 def _utime_invalid_arguments(self, name, arg):
365 with self.assertRaises(RuntimeError):
366 getattr(os, name)(arg, (5, 5), ns=(5, 5))
367
368 def test_utime_invalid_arguments(self):
369 self._utime_invalid_arguments('utime', self.fname)
370
371 @requires_lutimes
372 def test_lutimes_invalid_arguments(self):
373 self._utime_invalid_arguments('lutimes', self.fname)
374
375 @requires_futimes
376 def test_futimes_invalid_arguments(self):
377 with open(self.fname, "wb") as f:
378 self._utime_invalid_arguments('futimes', f.fileno())
379
Brian Curtin52fbea12011-11-06 13:41:17 -0600380
Victor Stinner1aa54a42012-02-08 04:09:37 +0100381 @unittest.skipUnless(stat_supports_subsecond,
382 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100383 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100384 asec, amsec = 1, 901
385 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100386 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100387 mtime = msec + mmsec * 1e-3
388 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100389 os.utime(filename, (0, 0))
390 set_time_func(filename, atime, mtime)
Victor Stinner1aa54a42012-02-08 04:09:37 +0100391 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100392 st = os.stat(filename)
393 self.assertAlmostEqual(st.st_atime, atime, places=3)
394 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100395
Victor Stinnera2f7c002012-02-08 03:36:25 +0100396 def test_utime_subsecond(self):
397 def set_time(filename, atime, mtime):
398 os.utime(filename, (atime, mtime))
399 self._test_utime_subsecond(set_time)
400
Larry Hastings76ad59b2012-05-03 00:30:07 -0700401 @requires_futimes
Victor Stinnera2f7c002012-02-08 03:36:25 +0100402 def test_futimes_subsecond(self):
403 def set_time(filename, atime, mtime):
404 with open(filename, "wb") as f:
405 os.futimes(f.fileno(), (atime, mtime))
406 self._test_utime_subsecond(set_time)
407
408 @unittest.skipUnless(hasattr(os, 'futimens'),
409 "os.futimens required for this test.")
410 def test_futimens_subsecond(self):
411 def set_time(filename, atime, mtime):
412 with open(filename, "wb") as f:
413 asec, ansec = divmod(atime, 1.0)
414 asec = int(asec)
415 ansec = int(ansec * 1e9)
416 msec, mnsec = divmod(mtime, 1.0)
417 msec = int(msec)
418 mnsec = int(mnsec * 1e9)
419 os.futimens(f.fileno(),
420 (asec, ansec),
421 (msec, mnsec))
422 self._test_utime_subsecond(set_time)
423
424 @unittest.skipUnless(hasattr(os, 'futimesat'),
425 "os.futimesat required for this test.")
426 def test_futimesat_subsecond(self):
427 def set_time(filename, atime, mtime):
428 dirname = os.path.dirname(filename)
429 dirfd = os.open(dirname, os.O_RDONLY)
430 try:
431 os.futimesat(dirfd, os.path.basename(filename),
432 (atime, mtime))
433 finally:
434 os.close(dirfd)
435 self._test_utime_subsecond(set_time)
436
Larry Hastings76ad59b2012-05-03 00:30:07 -0700437 @requires_lutimes
Victor Stinnera2f7c002012-02-08 03:36:25 +0100438 def test_lutimes_subsecond(self):
439 def set_time(filename, atime, mtime):
440 os.lutimes(filename, (atime, mtime))
441 self._test_utime_subsecond(set_time)
442
443 @unittest.skipUnless(hasattr(os, 'utimensat'),
444 "os.utimensat required for this test.")
445 def test_utimensat_subsecond(self):
446 def set_time(filename, atime, mtime):
447 dirname = os.path.dirname(filename)
448 dirfd = os.open(dirname, os.O_RDONLY)
449 try:
450 asec, ansec = divmod(atime, 1.0)
451 asec = int(asec)
452 ansec = int(ansec * 1e9)
453 msec, mnsec = divmod(mtime, 1.0)
454 msec = int(msec)
455 mnsec = int(mnsec * 1e9)
456 os.utimensat(dirfd, os.path.basename(filename),
457 (asec, ansec),
458 (msec, mnsec))
459 finally:
460 os.close(dirfd)
461 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100462
Thomas Wouters89f507f2006-12-13 04:49:30 +0000463 # Restrict test to Win32, since there is no guarantee other
464 # systems support centiseconds
465 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000466 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000467 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000468 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000469 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000470 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000471 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000472 return buf.value
473
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000474 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000475 def test_1565150(self):
476 t1 = 1159195039.25
477 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000478 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000479
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000480 def test_large_time(self):
481 t1 = 5000000000 # some day in 2128
482 os.utime(self.fname, (t1, t1))
483 self.assertEqual(os.stat(self.fname).st_mtime, t1)
484
Guido van Rossumd8faa362007-04-27 19:54:29 +0000485 def test_1686475(self):
486 # Verify that an open file can be stat'ed
487 try:
488 os.stat(r"c:\pagefile.sys")
489 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000490 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000491 return
492 self.fail("Could not stat pagefile.sys")
493
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000494from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000495
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000496class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000497 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000498 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000499
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000500 def setUp(self):
501 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000502 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000503 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000504 for key, value in self._reference().items():
505 os.environ[key] = value
506
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000507 def tearDown(self):
508 os.environ.clear()
509 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000510 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000511 os.environb.clear()
512 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000513
Christian Heimes90333392007-11-01 19:08:42 +0000514 def _reference(self):
515 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
516
517 def _empty_mapping(self):
518 os.environ.clear()
519 return os.environ
520
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000521 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000522 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000523 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000524 if os.path.exists("/bin/sh"):
525 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000526 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
527 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000528 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000529
Christian Heimes1a13d592007-11-08 14:16:55 +0000530 def test_os_popen_iter(self):
531 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000532 with os.popen(
533 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
534 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000535 self.assertEqual(next(it), "line1\n")
536 self.assertEqual(next(it), "line2\n")
537 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000538 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000539
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000540 # Verify environ keys and values from the OS are of the
541 # correct str type.
542 def test_keyvalue_types(self):
543 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000544 self.assertEqual(type(key), str)
545 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000546
Christian Heimes90333392007-11-01 19:08:42 +0000547 def test_items(self):
548 for key, value in self._reference().items():
549 self.assertEqual(os.environ.get(key), value)
550
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000551 # Issue 7310
552 def test___repr__(self):
553 """Check that the repr() of os.environ looks like environ({...})."""
554 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000555 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
556 '{!r}: {!r}'.format(key, value)
557 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000558
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000559 def test_get_exec_path(self):
560 defpath_list = os.defpath.split(os.pathsep)
561 test_path = ['/monty', '/python', '', '/flying/circus']
562 test_env = {'PATH': os.pathsep.join(test_path)}
563
564 saved_environ = os.environ
565 try:
566 os.environ = dict(test_env)
567 # Test that defaulting to os.environ works.
568 self.assertSequenceEqual(test_path, os.get_exec_path())
569 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
570 finally:
571 os.environ = saved_environ
572
573 # No PATH environment variable
574 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
575 # Empty PATH environment variable
576 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
577 # Supplied PATH environment variable
578 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
579
Victor Stinnerb745a742010-05-18 17:17:23 +0000580 if os.supports_bytes_environ:
581 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000582 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000583 # ignore BytesWarning warning
584 with warnings.catch_warnings(record=True):
585 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000586 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000587 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000588 pass
589 else:
590 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000591
592 # bytes key and/or value
593 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
594 ['abc'])
595 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
596 ['abc'])
597 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
598 ['abc'])
599
600 @unittest.skipUnless(os.supports_bytes_environ,
601 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000602 def test_environb(self):
603 # os.environ -> os.environb
604 value = 'euro\u20ac'
605 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000606 value_bytes = value.encode(sys.getfilesystemencoding(),
607 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000608 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000609 msg = "U+20AC character is not encodable to %s" % (
610 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000611 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000612 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000613 self.assertEqual(os.environ['unicode'], value)
614 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000615
616 # os.environb -> os.environ
617 value = b'\xff'
618 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000619 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000620 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000621 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000622
Charles-François Natali2966f102011-11-26 11:32:46 +0100623 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
624 # #13415).
625 @support.requires_freebsd_version(7)
626 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100627 def test_unset_error(self):
628 if sys.platform == "win32":
629 # an environment variable is limited to 32,767 characters
630 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100631 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100632 else:
633 # "=" is not allowed in a variable name
634 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100635 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100636
Tim Petersc4e09402003-04-25 07:11:48 +0000637class WalkTests(unittest.TestCase):
638 """Tests for os.walk()."""
639
Charles-François Natali7372b062012-02-05 15:15:38 +0100640 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000641 import os
642 from os.path import join
643
644 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000645 # TESTFN/
646 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000647 # tmp1
648 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000649 # tmp2
650 # SUB11/ no kids
651 # SUB2/ a file kid and a dirsymlink kid
652 # tmp3
653 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200654 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000655 # TEST2/
656 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000657 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000658 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000659 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000660 sub2_path = join(walk_path, "SUB2")
661 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000662 tmp2_path = join(sub1_path, "tmp2")
663 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000664 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000665 t2_path = join(support.TESTFN, "TEST2")
666 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200667 link_path = join(sub2_path, "link")
668 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000669
670 # Create stuff.
671 os.makedirs(sub11_path)
672 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000673 os.makedirs(t2_path)
674 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000675 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000676 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
677 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000678 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100679 if os.name == 'nt':
680 def symlink_to_dir(src, dest):
681 os.symlink(src, dest, True)
682 else:
683 symlink_to_dir = os.symlink
684 symlink_to_dir(os.path.abspath(t2_path), link_path)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200685 symlink_to_dir('broken', broken_link_path)
686 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000687 else:
688 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000689
690 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000691 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000692 self.assertEqual(len(all), 4)
693 # We can't know which order SUB1 and SUB2 will appear in.
694 # Not flipped: TESTFN, SUB1, SUB11, SUB2
695 # flipped: TESTFN, SUB2, SUB1, SUB11
696 flipped = all[0][1][0] != "SUB1"
697 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200698 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000699 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000700 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
701 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000702 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000703
704 # Prune the search.
705 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000706 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000707 all.append((root, dirs, files))
708 # Don't descend into SUB1.
709 if 'SUB1' in dirs:
710 # Note that this also mutates the dirs we appended to all!
711 dirs.remove('SUB1')
712 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000713 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200714 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000715 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000716
717 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000718 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000719 self.assertEqual(len(all), 4)
720 # We can't know which order SUB1 and SUB2 will appear in.
721 # Not flipped: SUB11, SUB1, SUB2, TESTFN
722 # flipped: SUB2, SUB11, SUB1, TESTFN
723 flipped = all[3][1][0] != "SUB1"
724 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200725 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000726 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000727 self.assertEqual(all[flipped], (sub11_path, [], []))
728 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000729 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000730
Brian Curtin3b4499c2010-12-28 14:31:47 +0000731 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000732 # Walk, following symlinks.
733 for root, dirs, files in os.walk(walk_path, followlinks=True):
734 if root == link_path:
735 self.assertEqual(dirs, [])
736 self.assertEqual(files, ["tmp4"])
737 break
738 else:
739 self.fail("Didn't follow symlink with followlinks=True")
740
741 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000742 # Tear everything down. This is a decent use for bottom-up on
743 # Windows, which doesn't have a recursive delete command. The
744 # (not so) subtlety is that rmdir will fail unless the dir's
745 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000746 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000747 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000748 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000749 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000750 dirname = os.path.join(root, name)
751 if not os.path.islink(dirname):
752 os.rmdir(dirname)
753 else:
754 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000755 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000756
Charles-François Natali7372b062012-02-05 15:15:38 +0100757
758@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
759class FwalkTests(WalkTests):
760 """Tests for os.fwalk()."""
761
762 def test_compare_to_walk(self):
763 # compare with walk() results
764 for topdown, followlinks in itertools.product((True, False), repeat=2):
765 args = support.TESTFN, topdown, None, followlinks
766 expected = {}
767 for root, dirs, files in os.walk(*args):
768 expected[root] = (set(dirs), set(files))
769
770 for root, dirs, files, rootfd in os.fwalk(*args):
771 self.assertIn(root, expected)
772 self.assertEqual(expected[root], (set(dirs), set(files)))
773
774 def test_dir_fd(self):
775 # check returned file descriptors
776 for topdown, followlinks in itertools.product((True, False), repeat=2):
777 args = support.TESTFN, topdown, None, followlinks
778 for root, dirs, files, rootfd in os.fwalk(*args):
779 # check that the FD is valid
780 os.fstat(rootfd)
Charles-François Natali77940902012-02-06 19:54:48 +0100781 # check that flistdir() returns consistent information
782 self.assertEqual(set(os.flistdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100783
784 def test_fd_leak(self):
785 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
786 # we both check that calling fwalk() a large number of times doesn't
787 # yield EMFILE, and that the minimum allocated FD hasn't changed.
788 minfd = os.dup(1)
789 os.close(minfd)
790 for i in range(256):
791 for x in os.fwalk(support.TESTFN):
792 pass
793 newfd = os.dup(1)
794 self.addCleanup(os.close, newfd)
795 self.assertEqual(newfd, minfd)
796
797 def tearDown(self):
798 # cleanup
799 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
800 for name in files:
801 os.unlinkat(rootfd, name)
802 for name in dirs:
803 st = os.fstatat(rootfd, name, os.AT_SYMLINK_NOFOLLOW)
804 if stat.S_ISDIR(st.st_mode):
805 os.unlinkat(rootfd, name, os.AT_REMOVEDIR)
806 else:
807 os.unlinkat(rootfd, name)
808 os.rmdir(support.TESTFN)
809
810
Guido van Rossume7ba4952007-06-06 23:52:48 +0000811class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000812 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000813 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000814
815 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000816 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000817 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
818 os.makedirs(path) # Should work
819 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
820 os.makedirs(path)
821
822 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000823 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000824 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
825 os.makedirs(path)
826 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
827 'dir5', 'dir6')
828 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000829
Terry Reedy5a22b652010-12-02 07:05:56 +0000830 def test_exist_ok_existing_directory(self):
831 path = os.path.join(support.TESTFN, 'dir1')
832 mode = 0o777
833 old_mask = os.umask(0o022)
834 os.makedirs(path, mode)
835 self.assertRaises(OSError, os.makedirs, path, mode)
836 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
837 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
838 os.makedirs(path, mode=mode, exist_ok=True)
839 os.umask(old_mask)
840
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700841 def test_exist_ok_s_isgid_directory(self):
842 path = os.path.join(support.TESTFN, 'dir1')
843 S_ISGID = stat.S_ISGID
844 mode = 0o777
845 old_mask = os.umask(0o022)
846 try:
847 existing_testfn_mode = stat.S_IMODE(
848 os.lstat(support.TESTFN).st_mode)
849 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
850 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
851 raise unittest.SkipTest('No support for S_ISGID dir mode.')
852 # The os should apply S_ISGID from the parent dir for us, but
853 # this test need not depend on that behavior. Be explicit.
854 os.makedirs(path, mode | S_ISGID)
855 # http://bugs.python.org/issue14992
856 # Should not fail when the bit is already set.
857 os.makedirs(path, mode, exist_ok=True)
858 # remove the bit.
859 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
860 with self.assertRaises(OSError):
861 # Should fail when the bit is not already set when demanded.
862 os.makedirs(path, mode | S_ISGID, exist_ok=True)
863 finally:
864 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000865
866 def test_exist_ok_existing_regular_file(self):
867 base = support.TESTFN
868 path = os.path.join(support.TESTFN, 'dir1')
869 f = open(path, 'w')
870 f.write('abc')
871 f.close()
872 self.assertRaises(OSError, os.makedirs, path)
873 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
874 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
875 os.remove(path)
876
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000877 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000878 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000879 'dir4', 'dir5', 'dir6')
880 # If the tests failed, the bottom-most directory ('../dir6')
881 # may not have been created, so we look for the outermost directory
882 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000883 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000884 path = os.path.dirname(path)
885
886 os.removedirs(path)
887
Guido van Rossume7ba4952007-06-06 23:52:48 +0000888class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000889 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200890 with open(os.devnull, 'wb') as f:
891 f.write(b'hello')
892 f.close()
893 with open(os.devnull, 'rb') as f:
894 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000895
Guido van Rossume7ba4952007-06-06 23:52:48 +0000896class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100897 def test_urandom_length(self):
898 self.assertEqual(len(os.urandom(0)), 0)
899 self.assertEqual(len(os.urandom(1)), 1)
900 self.assertEqual(len(os.urandom(10)), 10)
901 self.assertEqual(len(os.urandom(100)), 100)
902 self.assertEqual(len(os.urandom(1000)), 1000)
903
904 def test_urandom_value(self):
905 data1 = os.urandom(16)
906 data2 = os.urandom(16)
907 self.assertNotEqual(data1, data2)
908
909 def get_urandom_subprocess(self, count):
910 code = '\n'.join((
911 'import os, sys',
912 'data = os.urandom(%s)' % count,
913 'sys.stdout.buffer.write(data)',
914 'sys.stdout.buffer.flush()'))
915 out = assert_python_ok('-c', code)
916 stdout = out[1]
917 self.assertEqual(len(stdout), 16)
918 return stdout
919
920 def test_urandom_subprocess(self):
921 data1 = self.get_urandom_subprocess(16)
922 data2 = self.get_urandom_subprocess(16)
923 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000924
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000925@contextlib.contextmanager
926def _execvpe_mockup(defpath=None):
927 """
928 Stubs out execv and execve functions when used as context manager.
929 Records exec calls. The mock execv and execve functions always raise an
930 exception as they would normally never return.
931 """
932 # A list of tuples containing (function name, first arg, args)
933 # of calls to execv or execve that have been made.
934 calls = []
935
936 def mock_execv(name, *args):
937 calls.append(('execv', name, args))
938 raise RuntimeError("execv called")
939
940 def mock_execve(name, *args):
941 calls.append(('execve', name, args))
942 raise OSError(errno.ENOTDIR, "execve called")
943
944 try:
945 orig_execv = os.execv
946 orig_execve = os.execve
947 orig_defpath = os.defpath
948 os.execv = mock_execv
949 os.execve = mock_execve
950 if defpath is not None:
951 os.defpath = defpath
952 yield calls
953 finally:
954 os.execv = orig_execv
955 os.execve = orig_execve
956 os.defpath = orig_defpath
957
Guido van Rossume7ba4952007-06-06 23:52:48 +0000958class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000959 @unittest.skipIf(USING_LINUXTHREADS,
960 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000961 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000962 self.assertRaises(OSError, os.execvpe, 'no such app-',
963 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000964
Thomas Heller6790d602007-08-30 17:15:14 +0000965 def test_execvpe_with_bad_arglist(self):
966 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
967
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000968 @unittest.skipUnless(hasattr(os, '_execvpe'),
969 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000970 def _test_internal_execvpe(self, test_type):
971 program_path = os.sep + 'absolutepath'
972 if test_type is bytes:
973 program = b'executable'
974 fullpath = os.path.join(os.fsencode(program_path), program)
975 native_fullpath = fullpath
976 arguments = [b'progname', 'arg1', 'arg2']
977 else:
978 program = 'executable'
979 arguments = ['progname', 'arg1', 'arg2']
980 fullpath = os.path.join(program_path, program)
981 if os.name != "nt":
982 native_fullpath = os.fsencode(fullpath)
983 else:
984 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000985 env = {'spam': 'beans'}
986
Victor Stinnerb745a742010-05-18 17:17:23 +0000987 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000988 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000989 self.assertRaises(RuntimeError,
990 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000991 self.assertEqual(len(calls), 1)
992 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
993
Victor Stinnerb745a742010-05-18 17:17:23 +0000994 # test os._execvpe() with a relative path:
995 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000996 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000997 self.assertRaises(OSError,
998 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000999 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001000 self.assertSequenceEqual(calls[0],
1001 ('execve', native_fullpath, (arguments, env)))
1002
1003 # test os._execvpe() with a relative path:
1004 # os.get_exec_path() reads the 'PATH' variable
1005 with _execvpe_mockup() as calls:
1006 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001007 if test_type is bytes:
1008 env_path[b'PATH'] = program_path
1009 else:
1010 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001011 self.assertRaises(OSError,
1012 os._execvpe, program, arguments, env=env_path)
1013 self.assertEqual(len(calls), 1)
1014 self.assertSequenceEqual(calls[0],
1015 ('execve', native_fullpath, (arguments, env_path)))
1016
1017 def test_internal_execvpe_str(self):
1018 self._test_internal_execvpe(str)
1019 if os.name != "nt":
1020 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001021
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001022
Thomas Wouters477c8d52006-05-27 19:21:47 +00001023class Win32ErrorTests(unittest.TestCase):
1024 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001025 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001026
1027 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001028 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001029
1030 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001031 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001032
1033 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001034 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001035 try:
1036 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
1037 finally:
1038 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001039 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001040
1041 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001042 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001043
Thomas Wouters477c8d52006-05-27 19:21:47 +00001044 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +00001045 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001046
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001047class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001048 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001049 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1050 #singles.append("close")
1051 #We omit close because it doesn'r raise an exception on some platforms
1052 def get_single(f):
1053 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001054 if hasattr(os, f):
1055 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001056 return helper
1057 for f in singles:
1058 locals()["test_"+f] = get_single(f)
1059
Benjamin Peterson7522c742009-01-19 21:00:09 +00001060 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001061 try:
1062 f(support.make_bad_fd(), *args)
1063 except OSError as e:
1064 self.assertEqual(e.errno, errno.EBADF)
1065 else:
1066 self.fail("%r didn't raise a OSError with a bad file descriptor"
1067 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001068
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001069 def test_isatty(self):
1070 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001071 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001072
1073 def test_closerange(self):
1074 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001075 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +00001076 # Make sure none of the descriptors we are about to close are
1077 # currently valid (issue 6542).
1078 for i in range(10):
1079 try: os.fstat(fd+i)
1080 except OSError:
1081 pass
1082 else:
1083 break
1084 if i < 2:
1085 raise unittest.SkipTest(
1086 "Unable to acquire a range of invalid file descriptors")
1087 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001088
1089 def test_dup2(self):
1090 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001091 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001092
1093 def test_fchmod(self):
1094 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001095 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001096
1097 def test_fchown(self):
1098 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001099 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001100
1101 def test_fpathconf(self):
1102 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001103 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001104
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001105 def test_ftruncate(self):
1106 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001107 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001108
1109 def test_lseek(self):
1110 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001111 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001112
1113 def test_read(self):
1114 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001115 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001116
1117 def test_tcsetpgrpt(self):
1118 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001119 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001120
1121 def test_write(self):
1122 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001123 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001124
Brian Curtin1b9df392010-11-24 20:24:31 +00001125
1126class LinkTests(unittest.TestCase):
1127 def setUp(self):
1128 self.file1 = support.TESTFN
1129 self.file2 = os.path.join(support.TESTFN + "2")
1130
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001131 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001132 for file in (self.file1, self.file2):
1133 if os.path.exists(file):
1134 os.unlink(file)
1135
Brian Curtin1b9df392010-11-24 20:24:31 +00001136 def _test_link(self, file1, file2):
1137 with open(file1, "w") as f1:
1138 f1.write("test")
1139
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001140 with warnings.catch_warnings():
1141 warnings.simplefilter("ignore", DeprecationWarning)
1142 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001143 with open(file1, "r") as f1, open(file2, "r") as f2:
1144 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1145
1146 def test_link(self):
1147 self._test_link(self.file1, self.file2)
1148
1149 def test_link_bytes(self):
1150 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1151 bytes(self.file2, sys.getfilesystemencoding()))
1152
Brian Curtinf498b752010-11-30 15:54:04 +00001153 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001154 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001155 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001156 except UnicodeError:
1157 raise unittest.SkipTest("Unable to encode for this platform.")
1158
Brian Curtinf498b752010-11-30 15:54:04 +00001159 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001160 self.file2 = self.file1 + "2"
1161 self._test_link(self.file1, self.file2)
1162
Thomas Wouters477c8d52006-05-27 19:21:47 +00001163if sys.platform != 'win32':
1164 class Win32ErrorTests(unittest.TestCase):
1165 pass
1166
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001167 class PosixUidGidTests(unittest.TestCase):
1168 if hasattr(os, 'setuid'):
1169 def test_setuid(self):
1170 if os.getuid() != 0:
1171 self.assertRaises(os.error, os.setuid, 0)
1172 self.assertRaises(OverflowError, os.setuid, 1<<32)
1173
1174 if hasattr(os, 'setgid'):
1175 def test_setgid(self):
1176 if os.getuid() != 0:
1177 self.assertRaises(os.error, os.setgid, 0)
1178 self.assertRaises(OverflowError, os.setgid, 1<<32)
1179
1180 if hasattr(os, 'seteuid'):
1181 def test_seteuid(self):
1182 if os.getuid() != 0:
1183 self.assertRaises(os.error, os.seteuid, 0)
1184 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1185
1186 if hasattr(os, 'setegid'):
1187 def test_setegid(self):
1188 if os.getuid() != 0:
1189 self.assertRaises(os.error, os.setegid, 0)
1190 self.assertRaises(OverflowError, os.setegid, 1<<32)
1191
1192 if hasattr(os, 'setreuid'):
1193 def test_setreuid(self):
1194 if os.getuid() != 0:
1195 self.assertRaises(os.error, os.setreuid, 0, 0)
1196 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1197 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001198
1199 def test_setreuid_neg1(self):
1200 # Needs to accept -1. We run this in a subprocess to avoid
1201 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001202 subprocess.check_call([
1203 sys.executable, '-c',
1204 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001205
1206 if hasattr(os, 'setregid'):
1207 def test_setregid(self):
1208 if os.getuid() != 0:
1209 self.assertRaises(os.error, os.setregid, 0, 0)
1210 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1211 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001212
1213 def test_setregid_neg1(self):
1214 # Needs to accept -1. We run this in a subprocess to avoid
1215 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001216 subprocess.check_call([
1217 sys.executable, '-c',
1218 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001219
1220 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001221 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001222 if support.TESTFN_UNENCODABLE:
1223 self.dir = support.TESTFN_UNENCODABLE
1224 else:
1225 self.dir = support.TESTFN
1226 self.bdir = os.fsencode(self.dir)
1227
1228 bytesfn = []
1229 def add_filename(fn):
1230 try:
1231 fn = os.fsencode(fn)
1232 except UnicodeEncodeError:
1233 return
1234 bytesfn.append(fn)
1235 add_filename(support.TESTFN_UNICODE)
1236 if support.TESTFN_UNENCODABLE:
1237 add_filename(support.TESTFN_UNENCODABLE)
1238 if not bytesfn:
1239 self.skipTest("couldn't create any non-ascii filename")
1240
1241 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001242 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001243 try:
1244 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001245 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001246 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001247 if fn in self.unicodefn:
1248 raise ValueError("duplicate filename")
1249 self.unicodefn.add(fn)
1250 except:
1251 shutil.rmtree(self.dir)
1252 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001253
1254 def tearDown(self):
1255 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001256
1257 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001258 expected = self.unicodefn
1259 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001260 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001261
1262 def test_open(self):
1263 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001264 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001265 f.close()
1266
1267 def test_stat(self):
1268 for fn in self.unicodefn:
1269 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001270else:
1271 class PosixUidGidTests(unittest.TestCase):
1272 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001273 class Pep383Tests(unittest.TestCase):
1274 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001275
Brian Curtineb24d742010-04-12 17:16:38 +00001276@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1277class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001278 def _kill(self, sig):
1279 # Start sys.executable as a subprocess and communicate from the
1280 # subprocess to the parent that the interpreter is ready. When it
1281 # becomes ready, send *sig* via os.kill to the subprocess and check
1282 # that the return code is equal to *sig*.
1283 import ctypes
1284 from ctypes import wintypes
1285 import msvcrt
1286
1287 # Since we can't access the contents of the process' stdout until the
1288 # process has exited, use PeekNamedPipe to see what's inside stdout
1289 # without waiting. This is done so we can tell that the interpreter
1290 # is started and running at a point where it could handle a signal.
1291 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1292 PeekNamedPipe.restype = wintypes.BOOL
1293 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1294 ctypes.POINTER(ctypes.c_char), # stdout buf
1295 wintypes.DWORD, # Buffer size
1296 ctypes.POINTER(wintypes.DWORD), # bytes read
1297 ctypes.POINTER(wintypes.DWORD), # bytes avail
1298 ctypes.POINTER(wintypes.DWORD)) # bytes left
1299 msg = "running"
1300 proc = subprocess.Popen([sys.executable, "-c",
1301 "import sys;"
1302 "sys.stdout.write('{}');"
1303 "sys.stdout.flush();"
1304 "input()".format(msg)],
1305 stdout=subprocess.PIPE,
1306 stderr=subprocess.PIPE,
1307 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001308 self.addCleanup(proc.stdout.close)
1309 self.addCleanup(proc.stderr.close)
1310 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001311
1312 count, max = 0, 100
1313 while count < max and proc.poll() is None:
1314 # Create a string buffer to store the result of stdout from the pipe
1315 buf = ctypes.create_string_buffer(len(msg))
1316 # Obtain the text currently in proc.stdout
1317 # Bytes read/avail/left are left as NULL and unused
1318 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1319 buf, ctypes.sizeof(buf), None, None, None)
1320 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1321 if buf.value:
1322 self.assertEqual(msg, buf.value.decode())
1323 break
1324 time.sleep(0.1)
1325 count += 1
1326 else:
1327 self.fail("Did not receive communication from the subprocess")
1328
Brian Curtineb24d742010-04-12 17:16:38 +00001329 os.kill(proc.pid, sig)
1330 self.assertEqual(proc.wait(), sig)
1331
1332 def test_kill_sigterm(self):
1333 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001334 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001335
1336 def test_kill_int(self):
1337 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001338 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001339
1340 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001341 tagname = "test_os_%s" % uuid.uuid1()
1342 m = mmap.mmap(-1, 1, tagname)
1343 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001344 # Run a script which has console control handling enabled.
1345 proc = subprocess.Popen([sys.executable,
1346 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001347 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001348 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1349 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001350 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001351 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001352 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001353 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001354 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001355 count += 1
1356 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001357 # Forcefully kill the process if we weren't able to signal it.
1358 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001359 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001360 os.kill(proc.pid, event)
1361 # proc.send_signal(event) could also be done here.
1362 # Allow time for the signal to be passed and the process to exit.
1363 time.sleep(0.5)
1364 if not proc.poll():
1365 # Forcefully kill the process if we weren't able to signal it.
1366 os.kill(proc.pid, signal.SIGINT)
1367 self.fail("subprocess did not stop on {}".format(name))
1368
1369 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1370 def test_CTRL_C_EVENT(self):
1371 from ctypes import wintypes
1372 import ctypes
1373
1374 # Make a NULL value by creating a pointer with no argument.
1375 NULL = ctypes.POINTER(ctypes.c_int)()
1376 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1377 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1378 wintypes.BOOL)
1379 SetConsoleCtrlHandler.restype = wintypes.BOOL
1380
1381 # Calling this with NULL and FALSE causes the calling process to
1382 # handle CTRL+C, rather than ignore it. This property is inherited
1383 # by subprocesses.
1384 SetConsoleCtrlHandler(NULL, 0)
1385
1386 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1387
1388 def test_CTRL_BREAK_EVENT(self):
1389 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1390
1391
Brian Curtind40e6f72010-07-08 21:39:08 +00001392@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001393@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001394class Win32SymlinkTests(unittest.TestCase):
1395 filelink = 'filelinktest'
1396 filelink_target = os.path.abspath(__file__)
1397 dirlink = 'dirlinktest'
1398 dirlink_target = os.path.dirname(filelink_target)
1399 missing_link = 'missing link'
1400
1401 def setUp(self):
1402 assert os.path.exists(self.dirlink_target)
1403 assert os.path.exists(self.filelink_target)
1404 assert not os.path.exists(self.dirlink)
1405 assert not os.path.exists(self.filelink)
1406 assert not os.path.exists(self.missing_link)
1407
1408 def tearDown(self):
1409 if os.path.exists(self.filelink):
1410 os.remove(self.filelink)
1411 if os.path.exists(self.dirlink):
1412 os.rmdir(self.dirlink)
1413 if os.path.lexists(self.missing_link):
1414 os.remove(self.missing_link)
1415
1416 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001417 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001418 self.assertTrue(os.path.exists(self.dirlink))
1419 self.assertTrue(os.path.isdir(self.dirlink))
1420 self.assertTrue(os.path.islink(self.dirlink))
1421 self.check_stat(self.dirlink, self.dirlink_target)
1422
1423 def test_file_link(self):
1424 os.symlink(self.filelink_target, self.filelink)
1425 self.assertTrue(os.path.exists(self.filelink))
1426 self.assertTrue(os.path.isfile(self.filelink))
1427 self.assertTrue(os.path.islink(self.filelink))
1428 self.check_stat(self.filelink, self.filelink_target)
1429
1430 def _create_missing_dir_link(self):
1431 'Create a "directory" link to a non-existent target'
1432 linkname = self.missing_link
1433 if os.path.lexists(linkname):
1434 os.remove(linkname)
1435 target = r'c:\\target does not exist.29r3c740'
1436 assert not os.path.exists(target)
1437 target_is_dir = True
1438 os.symlink(target, linkname, target_is_dir)
1439
1440 def test_remove_directory_link_to_missing_target(self):
1441 self._create_missing_dir_link()
1442 # For compatibility with Unix, os.remove will check the
1443 # directory status and call RemoveDirectory if the symlink
1444 # was created with target_is_dir==True.
1445 os.remove(self.missing_link)
1446
1447 @unittest.skip("currently fails; consider for improvement")
1448 def test_isdir_on_directory_link_to_missing_target(self):
1449 self._create_missing_dir_link()
1450 # consider having isdir return true for directory links
1451 self.assertTrue(os.path.isdir(self.missing_link))
1452
1453 @unittest.skip("currently fails; consider for improvement")
1454 def test_rmdir_on_directory_link_to_missing_target(self):
1455 self._create_missing_dir_link()
1456 # consider allowing rmdir to remove directory links
1457 os.rmdir(self.missing_link)
1458
1459 def check_stat(self, link, target):
1460 self.assertEqual(os.stat(link), os.stat(target))
1461 self.assertNotEqual(os.lstat(link), os.stat(link))
1462
Brian Curtind25aef52011-06-13 15:16:04 -05001463 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001464 with warnings.catch_warnings():
1465 warnings.simplefilter("ignore", DeprecationWarning)
1466 self.assertEqual(os.stat(bytes_link), os.stat(target))
1467 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001468
1469 def test_12084(self):
1470 level1 = os.path.abspath(support.TESTFN)
1471 level2 = os.path.join(level1, "level2")
1472 level3 = os.path.join(level2, "level3")
1473 try:
1474 os.mkdir(level1)
1475 os.mkdir(level2)
1476 os.mkdir(level3)
1477
1478 file1 = os.path.abspath(os.path.join(level1, "file1"))
1479
1480 with open(file1, "w") as f:
1481 f.write("file1")
1482
1483 orig_dir = os.getcwd()
1484 try:
1485 os.chdir(level2)
1486 link = os.path.join(level2, "link")
1487 os.symlink(os.path.relpath(file1), "link")
1488 self.assertIn("link", os.listdir(os.getcwd()))
1489
1490 # Check os.stat calls from the same dir as the link
1491 self.assertEqual(os.stat(file1), os.stat("link"))
1492
1493 # Check os.stat calls from a dir below the link
1494 os.chdir(level1)
1495 self.assertEqual(os.stat(file1),
1496 os.stat(os.path.relpath(link)))
1497
1498 # Check os.stat calls from a dir above the link
1499 os.chdir(level3)
1500 self.assertEqual(os.stat(file1),
1501 os.stat(os.path.relpath(link)))
1502 finally:
1503 os.chdir(orig_dir)
1504 except OSError as err:
1505 self.fail(err)
1506 finally:
1507 os.remove(file1)
1508 shutil.rmtree(level1)
1509
Brian Curtind40e6f72010-07-08 21:39:08 +00001510
Victor Stinnere8d51452010-08-19 01:05:19 +00001511class FSEncodingTests(unittest.TestCase):
1512 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001513 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1514 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001515
Victor Stinnere8d51452010-08-19 01:05:19 +00001516 def test_identity(self):
1517 # assert fsdecode(fsencode(x)) == x
1518 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1519 try:
1520 bytesfn = os.fsencode(fn)
1521 except UnicodeEncodeError:
1522 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001523 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001524
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001525
Brett Cannonefb00c02012-02-29 18:31:31 -05001526
1527class DeviceEncodingTests(unittest.TestCase):
1528
1529 def test_bad_fd(self):
1530 # Return None when an fd doesn't actually exist.
1531 self.assertIsNone(os.device_encoding(123456))
1532
Philip Jenveye308b7c2012-02-29 16:16:15 -08001533 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1534 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001535 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001536 def test_device_encoding(self):
1537 encoding = os.device_encoding(0)
1538 self.assertIsNotNone(encoding)
1539 self.assertTrue(codecs.lookup(encoding))
1540
1541
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001542class PidTests(unittest.TestCase):
1543 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1544 def test_getppid(self):
1545 p = subprocess.Popen([sys.executable, '-c',
1546 'import os; print(os.getppid())'],
1547 stdout=subprocess.PIPE)
1548 stdout, _ = p.communicate()
1549 # We are the parent of our subprocess
1550 self.assertEqual(int(stdout), os.getpid())
1551
1552
Brian Curtin0151b8e2010-09-24 13:43:43 +00001553# The introduction of this TestCase caused at least two different errors on
1554# *nix buildbots. Temporarily skip this to let the buildbots move along.
1555@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001556@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1557class LoginTests(unittest.TestCase):
1558 def test_getlogin(self):
1559 user_name = os.getlogin()
1560 self.assertNotEqual(len(user_name), 0)
1561
1562
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001563@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1564 "needs os.getpriority and os.setpriority")
1565class ProgramPriorityTests(unittest.TestCase):
1566 """Tests for os.getpriority() and os.setpriority()."""
1567
1568 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001569
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001570 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1571 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1572 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001573 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1574 if base >= 19 and new_prio <= 19:
1575 raise unittest.SkipTest(
1576 "unable to reliably test setpriority at current nice level of %s" % base)
1577 else:
1578 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001579 finally:
1580 try:
1581 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1582 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001583 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001584 raise
1585
1586
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001587if threading is not None:
1588 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001589
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001590 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001591
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001592 def __init__(self, conn):
1593 asynchat.async_chat.__init__(self, conn)
1594 self.in_buffer = []
1595 self.closed = False
1596 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001597
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001598 def handle_read(self):
1599 data = self.recv(4096)
1600 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001601
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001602 def get_data(self):
1603 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001604
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001605 def handle_close(self):
1606 self.close()
1607 self.closed = True
1608
1609 def handle_error(self):
1610 raise
1611
1612 def __init__(self, address):
1613 threading.Thread.__init__(self)
1614 asyncore.dispatcher.__init__(self)
1615 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1616 self.bind(address)
1617 self.listen(5)
1618 self.host, self.port = self.socket.getsockname()[:2]
1619 self.handler_instance = None
1620 self._active = False
1621 self._active_lock = threading.Lock()
1622
1623 # --- public API
1624
1625 @property
1626 def running(self):
1627 return self._active
1628
1629 def start(self):
1630 assert not self.running
1631 self.__flag = threading.Event()
1632 threading.Thread.start(self)
1633 self.__flag.wait()
1634
1635 def stop(self):
1636 assert self.running
1637 self._active = False
1638 self.join()
1639
1640 def wait(self):
1641 # wait for handler connection to be closed, then stop the server
1642 while not getattr(self.handler_instance, "closed", False):
1643 time.sleep(0.001)
1644 self.stop()
1645
1646 # --- internals
1647
1648 def run(self):
1649 self._active = True
1650 self.__flag.set()
1651 while self._active and asyncore.socket_map:
1652 self._active_lock.acquire()
1653 asyncore.loop(timeout=0.001, count=1)
1654 self._active_lock.release()
1655 asyncore.close_all()
1656
1657 def handle_accept(self):
1658 conn, addr = self.accept()
1659 self.handler_instance = self.Handler(conn)
1660
1661 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001662 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001663 handle_read = handle_connect
1664
1665 def writable(self):
1666 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001667
1668 def handle_error(self):
1669 raise
1670
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001671
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001672@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001673@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1674class TestSendfile(unittest.TestCase):
1675
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001676 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001677 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001678 not sys.platform.startswith("solaris") and \
1679 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001680
1681 @classmethod
1682 def setUpClass(cls):
1683 with open(support.TESTFN, "wb") as f:
1684 f.write(cls.DATA)
1685
1686 @classmethod
1687 def tearDownClass(cls):
1688 support.unlink(support.TESTFN)
1689
1690 def setUp(self):
1691 self.server = SendfileTestServer((support.HOST, 0))
1692 self.server.start()
1693 self.client = socket.socket()
1694 self.client.connect((self.server.host, self.server.port))
1695 self.client.settimeout(1)
1696 # synchronize by waiting for "220 ready" response
1697 self.client.recv(1024)
1698 self.sockno = self.client.fileno()
1699 self.file = open(support.TESTFN, 'rb')
1700 self.fileno = self.file.fileno()
1701
1702 def tearDown(self):
1703 self.file.close()
1704 self.client.close()
1705 if self.server.running:
1706 self.server.stop()
1707
1708 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1709 """A higher level wrapper representing how an application is
1710 supposed to use sendfile().
1711 """
1712 while 1:
1713 try:
1714 if self.SUPPORT_HEADERS_TRAILERS:
1715 return os.sendfile(sock, file, offset, nbytes, headers,
1716 trailers)
1717 else:
1718 return os.sendfile(sock, file, offset, nbytes)
1719 except OSError as err:
1720 if err.errno == errno.ECONNRESET:
1721 # disconnected
1722 raise
1723 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1724 # we have to retry send data
1725 continue
1726 else:
1727 raise
1728
1729 def test_send_whole_file(self):
1730 # normal send
1731 total_sent = 0
1732 offset = 0
1733 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001734 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001735 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1736 if sent == 0:
1737 break
1738 offset += sent
1739 total_sent += sent
1740 self.assertTrue(sent <= nbytes)
1741 self.assertEqual(offset, total_sent)
1742
1743 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001744 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001745 self.client.close()
1746 self.server.wait()
1747 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001748 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001749 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001750
1751 def test_send_at_certain_offset(self):
1752 # start sending a file at a certain offset
1753 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001754 offset = len(self.DATA) // 2
1755 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001756 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001757 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001758 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1759 if sent == 0:
1760 break
1761 offset += sent
1762 total_sent += sent
1763 self.assertTrue(sent <= nbytes)
1764
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001765 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001766 self.client.close()
1767 self.server.wait()
1768 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001769 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001770 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001771 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001772 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001773
1774 def test_offset_overflow(self):
1775 # specify an offset > file size
1776 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001777 try:
1778 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1779 except OSError as e:
1780 # Solaris can raise EINVAL if offset >= file length, ignore.
1781 if e.errno != errno.EINVAL:
1782 raise
1783 else:
1784 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001785 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001786 self.client.close()
1787 self.server.wait()
1788 data = self.server.handler_instance.get_data()
1789 self.assertEqual(data, b'')
1790
1791 def test_invalid_offset(self):
1792 with self.assertRaises(OSError) as cm:
1793 os.sendfile(self.sockno, self.fileno, -1, 4096)
1794 self.assertEqual(cm.exception.errno, errno.EINVAL)
1795
1796 # --- headers / trailers tests
1797
1798 if SUPPORT_HEADERS_TRAILERS:
1799
1800 def test_headers(self):
1801 total_sent = 0
1802 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1803 headers=[b"x" * 512])
1804 total_sent += sent
1805 offset = 4096
1806 nbytes = 4096
1807 while 1:
1808 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1809 offset, nbytes)
1810 if sent == 0:
1811 break
1812 total_sent += sent
1813 offset += sent
1814
1815 expected_data = b"x" * 512 + self.DATA
1816 self.assertEqual(total_sent, len(expected_data))
1817 self.client.close()
1818 self.server.wait()
1819 data = self.server.handler_instance.get_data()
1820 self.assertEqual(hash(data), hash(expected_data))
1821
1822 def test_trailers(self):
1823 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001824 with open(TESTFN2, 'wb') as f:
1825 f.write(b"abcde")
1826 with open(TESTFN2, 'rb')as f:
1827 self.addCleanup(os.remove, TESTFN2)
1828 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1829 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001830 self.client.close()
1831 self.server.wait()
1832 data = self.server.handler_instance.get_data()
1833 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001834
1835 if hasattr(os, "SF_NODISKIO"):
1836 def test_flags(self):
1837 try:
1838 os.sendfile(self.sockno, self.fileno, 0, 4096,
1839 flags=os.SF_NODISKIO)
1840 except OSError as err:
1841 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1842 raise
1843
1844
Antoine Pitrou424246f2012-05-12 19:02:01 +02001845@support.skip_unless_xattr
Benjamin Peterson799bd802011-08-31 22:15:17 -04001846class ExtendedAttributeTests(unittest.TestCase):
1847
1848 def tearDown(self):
1849 support.unlink(support.TESTFN)
1850
1851 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1852 fn = support.TESTFN
1853 open(fn, "wb").close()
1854 with self.assertRaises(OSError) as cm:
1855 getxattr(fn, s("user.test"))
1856 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001857 init_xattr = listxattr(fn)
1858 self.assertIsInstance(init_xattr, list)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001859 setxattr(fn, s("user.test"), b"")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001860 xattr = set(init_xattr)
1861 xattr.add("user.test")
1862 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001863 self.assertEqual(getxattr(fn, b"user.test"), b"")
1864 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1865 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1866 with self.assertRaises(OSError) as cm:
1867 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1868 self.assertEqual(cm.exception.errno, errno.EEXIST)
1869 with self.assertRaises(OSError) as cm:
1870 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1871 self.assertEqual(cm.exception.errno, errno.ENODATA)
1872 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001873 xattr.add("user.test2")
1874 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001875 removexattr(fn, s("user.test"))
1876 with self.assertRaises(OSError) as cm:
1877 getxattr(fn, s("user.test"))
1878 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001879 xattr.remove("user.test")
1880 self.assertEqual(set(listxattr(fn)), xattr)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001881 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1882 setxattr(fn, s("user.test"), b"a"*1024)
1883 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1884 removexattr(fn, s("user.test"))
1885 many = sorted("user.test{}".format(i) for i in range(100))
1886 for thing in many:
1887 setxattr(fn, thing, b"x")
Victor Stinnerf12e5062011-10-16 22:12:03 +02001888 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001889
1890 def _check_xattrs(self, *args):
1891 def make_bytes(s):
1892 return bytes(s, "ascii")
1893 self._check_xattrs_str(str, *args)
1894 support.unlink(support.TESTFN)
1895 self._check_xattrs_str(make_bytes, *args)
1896
1897 def test_simple(self):
1898 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1899 os.listxattr)
1900
1901 def test_lpath(self):
1902 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1903 os.llistxattr)
1904
1905 def test_fds(self):
1906 def getxattr(path, *args):
1907 with open(path, "rb") as fp:
1908 return os.fgetxattr(fp.fileno(), *args)
1909 def setxattr(path, *args):
1910 with open(path, "wb") as fp:
1911 os.fsetxattr(fp.fileno(), *args)
1912 def removexattr(path, *args):
1913 with open(path, "wb") as fp:
1914 os.fremovexattr(fp.fileno(), *args)
1915 def listxattr(path, *args):
1916 with open(path, "rb") as fp:
1917 return os.flistxattr(fp.fileno(), *args)
1918 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1919
1920
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001921@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1922class Win32DeprecatedBytesAPI(unittest.TestCase):
1923 def test_deprecated(self):
1924 import nt
1925 filename = os.fsencode(support.TESTFN)
1926 with warnings.catch_warnings():
1927 warnings.simplefilter("error", DeprecationWarning)
1928 for func, *args in (
1929 (nt._getfullpathname, filename),
1930 (nt._isdir, filename),
1931 (os.access, filename, os.R_OK),
1932 (os.chdir, filename),
1933 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001934 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001935 (os.link, filename, filename),
1936 (os.listdir, filename),
1937 (os.lstat, filename),
1938 (os.mkdir, filename),
1939 (os.open, filename, os.O_RDONLY),
1940 (os.rename, filename, filename),
1941 (os.rmdir, filename),
1942 (os.startfile, filename),
1943 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001944 (os.unlink, filename),
1945 (os.utime, filename),
1946 ):
1947 self.assertRaises(DeprecationWarning, func, *args)
1948
Victor Stinner28216442011-11-16 00:34:44 +01001949 @support.skip_unless_symlink
1950 def test_symlink(self):
1951 filename = os.fsencode(support.TESTFN)
1952 with warnings.catch_warnings():
1953 warnings.simplefilter("error", DeprecationWarning)
1954 self.assertRaises(DeprecationWarning,
1955 os.symlink, filename, filename)
1956
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001957
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001958@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
1959class TermsizeTests(unittest.TestCase):
1960 def test_does_not_crash(self):
1961 """Check if get_terminal_size() returns a meaningful value.
1962
1963 There's no easy portable way to actually check the size of the
1964 terminal, so let's check if it returns something sensible instead.
1965 """
1966 try:
1967 size = os.get_terminal_size()
1968 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001969 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001970 # Under win32 a generic OSError can be thrown if the
1971 # handle cannot be retrieved
1972 self.skipTest("failed to query terminal size")
1973 raise
1974
Antoine Pitroucfade362012-02-08 23:48:59 +01001975 self.assertGreaterEqual(size.columns, 0)
1976 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001977
1978 def test_stty_match(self):
1979 """Check if stty returns the same results
1980
1981 stty actually tests stdin, so get_terminal_size is invoked on
1982 stdin explicitly. If stty succeeded, then get_terminal_size()
1983 should work too.
1984 """
1985 try:
1986 size = subprocess.check_output(['stty', 'size']).decode().split()
1987 except (FileNotFoundError, subprocess.CalledProcessError):
1988 self.skipTest("stty invocation failed")
1989 expected = (int(size[1]), int(size[0])) # reversed order
1990
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01001991 try:
1992 actual = os.get_terminal_size(sys.__stdin__.fileno())
1993 except OSError as e:
1994 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
1995 # Under win32 a generic OSError can be thrown if the
1996 # handle cannot be retrieved
1997 self.skipTest("failed to query terminal size")
1998 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01001999 self.assertEqual(expected, actual)
2000
2001
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002002@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002003def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002004 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002005 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002006 StatAttributeTests,
2007 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002008 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002009 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002010 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002011 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002012 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002013 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002014 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002015 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002016 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002017 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002018 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002019 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002020 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002021 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002022 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002023 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002024 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002025 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002026 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002027 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002028 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002029 TermsizeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002030 )
Fred Drake2e2be372001-09-20 21:33:42 +00002031
2032if __name__ == "__main__":
2033 test_main()