blob: 7c73f1e1aa01fc0e690e17529f9b8483dd54e9e4 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010023import itertools
24import stat
Brett Cannonefb00c02012-02-29 18:31:31 -050025import locale
26import codecs
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000027try:
28 import threading
29except ImportError:
30 threading = None
Georg Brandl2daf6ae2012-02-20 19:54:16 +010031from test.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000032
Victor Stinner034d0aa2012-06-05 01:22:15 +020033with warnings.catch_warnings():
34 warnings.simplefilter("ignore", DeprecationWarning)
35 os.stat_float_times(True)
Victor Stinner1aa54a42012-02-08 04:09:37 +010036st = os.stat(__file__)
37stat_supports_subsecond = (
38 # check if float and int timestamps are different
39 (st.st_atime != st[7])
40 or (st.st_mtime != st[8])
41 or (st.st_ctime != st[9]))
42
Mark Dickinson7cf03892010-04-16 13:45:35 +000043# Detect whether we're on a Linux system that uses the (now outdated
44# and unmaintained) linuxthreads threading library. There's an issue
45# when combining linuxthreads with a failed execv call: see
46# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020047if hasattr(sys, 'thread_info') and sys.thread_info.version:
48 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
49else:
50 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000051
Thomas Wouters0e3f5912006-08-11 14:57:12 +000052# Tests creating TESTFN
53class FileTests(unittest.TestCase):
54 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000055 if os.path.exists(support.TESTFN):
56 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000057 tearDown = setUp
58
59 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000060 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000061 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000062 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000063
Christian Heimesfdab48e2008-01-20 09:06:41 +000064 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000065 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
66 # We must allocate two consecutive file descriptors, otherwise
67 # it will mess up other file descriptors (perhaps even the three
68 # standard ones).
69 second = os.dup(first)
70 try:
71 retries = 0
72 while second != first + 1:
73 os.close(first)
74 retries += 1
75 if retries > 10:
76 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000077 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000078 first, second = second, os.dup(second)
79 finally:
80 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000081 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000082 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000083 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000084
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000085 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000086 def test_rename(self):
87 path = support.TESTFN
88 old = sys.getrefcount(path)
89 self.assertRaises(TypeError, os.rename, path, 0)
90 new = sys.getrefcount(path)
91 self.assertEqual(old, new)
92
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000093 def test_read(self):
94 with open(support.TESTFN, "w+b") as fobj:
95 fobj.write(b"spam")
96 fobj.flush()
97 fd = fobj.fileno()
98 os.lseek(fd, 0, 0)
99 s = os.read(fd, 4)
100 self.assertEqual(type(s), bytes)
101 self.assertEqual(s, b"spam")
102
103 def test_write(self):
104 # os.write() accepts bytes- and buffer-like objects but not strings
105 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
106 self.assertRaises(TypeError, os.write, fd, "beans")
107 os.write(fd, b"bacon\n")
108 os.write(fd, bytearray(b"eggs\n"))
109 os.write(fd, memoryview(b"spam\n"))
110 os.close(fd)
111 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000112 self.assertEqual(fobj.read().splitlines(),
113 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000114
Victor Stinnere0daff12011-03-20 23:36:35 +0100115 def write_windows_console(self, *args):
116 retcode = subprocess.call(args,
117 # use a new console to not flood the test output
118 creationflags=subprocess.CREATE_NEW_CONSOLE,
119 # use a shell to hide the console window (SW_HIDE)
120 shell=True)
121 self.assertEqual(retcode, 0)
122
123 @unittest.skipUnless(sys.platform == 'win32',
124 'test specific to the Windows console')
125 def test_write_windows_console(self):
126 # Issue #11395: the Windows console returns an error (12: not enough
127 # space error) on writing into stdout if stdout mode is binary and the
128 # length is greater than 66,000 bytes (or less, depending on heap
129 # usage).
130 code = "print('x' * 100000)"
131 self.write_windows_console(sys.executable, "-c", code)
132 self.write_windows_console(sys.executable, "-u", "-c", code)
133
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000134 def fdopen_helper(self, *args):
135 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200136 f = os.fdopen(fd, *args)
137 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000138
139 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200140 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
141 os.close(fd)
142
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000143 self.fdopen_helper()
144 self.fdopen_helper('r')
145 self.fdopen_helper('r', 100)
146
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100147 def test_replace(self):
148 TESTFN2 = support.TESTFN + ".2"
149 with open(support.TESTFN, 'w') as f:
150 f.write("1")
151 with open(TESTFN2, 'w') as f:
152 f.write("2")
153 self.addCleanup(os.unlink, TESTFN2)
154 os.replace(support.TESTFN, TESTFN2)
155 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
156 with open(TESTFN2, 'r') as f:
157 self.assertEqual(f.read(), "1")
158
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200159
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000160# Test attributes on return values from os.*stat* family.
161class StatAttributeTests(unittest.TestCase):
162 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000163 os.mkdir(support.TESTFN)
164 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000166 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000168
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169 def tearDown(self):
170 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000171 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000172
Antoine Pitrou38425292010-09-21 18:19:07 +0000173 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000174 if not hasattr(os, "stat"):
175 return
176
Antoine Pitrou38425292010-09-21 18:19:07 +0000177 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000178
179 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000180 self.assertEqual(result[stat.ST_SIZE], 3)
181 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000182
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000183 # Make sure all the attributes are there
184 members = dir(result)
185 for name in dir(stat):
186 if name[:3] == 'ST_':
187 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000188 if name.endswith("TIME"):
189 def trunc(x): return int(x)
190 else:
191 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000192 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000193 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000194 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000195
Larry Hastings6fe20b32012-04-19 15:07:49 -0700196 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700197 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700198 for name in 'st_atime st_mtime st_ctime'.split():
199 floaty = int(getattr(result, name) * 100000)
200 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700201 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700202
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203 try:
204 result[200]
205 self.fail("No exception thrown")
206 except IndexError:
207 pass
208
209 # Make sure that assignment fails
210 try:
211 result.st_mode = 1
212 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000213 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000214 pass
215
216 try:
217 result.st_rdev = 1
218 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000219 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000220 pass
221
222 try:
223 result.parrot = 1
224 self.fail("No exception thrown")
225 except AttributeError:
226 pass
227
228 # Use the stat_result constructor with a too-short tuple.
229 try:
230 result2 = os.stat_result((10,))
231 self.fail("No exception thrown")
232 except TypeError:
233 pass
234
Ezio Melotti42da6632011-03-15 05:18:48 +0200235 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000236 try:
237 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
238 except TypeError:
239 pass
240
Antoine Pitrou38425292010-09-21 18:19:07 +0000241 def test_stat_attributes(self):
242 self.check_stat_attributes(self.fname)
243
244 def test_stat_attributes_bytes(self):
245 try:
246 fname = self.fname.encode(sys.getfilesystemencoding())
247 except UnicodeEncodeError:
248 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100249 with warnings.catch_warnings():
250 warnings.simplefilter("ignore", DeprecationWarning)
251 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000252
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000253 def test_statvfs_attributes(self):
254 if not hasattr(os, "statvfs"):
255 return
256
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000257 try:
258 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000259 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000260 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000261 if e.errno == errno.ENOSYS:
262 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
264 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000266
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000267 # Make sure all the attributes are there.
268 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
269 'ffree', 'favail', 'flag', 'namemax')
270 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000271 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272
273 # Make sure that assignment really fails
274 try:
275 result.f_bfree = 1
276 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000277 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000278 pass
279
280 try:
281 result.parrot = 1
282 self.fail("No exception thrown")
283 except AttributeError:
284 pass
285
286 # Use the constructor with a too-short tuple.
287 try:
288 result2 = os.statvfs_result((10,))
289 self.fail("No exception thrown")
290 except TypeError:
291 pass
292
Ezio Melotti42da6632011-03-15 05:18:48 +0200293 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000294 try:
295 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
296 except TypeError:
297 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000298
Thomas Wouters89f507f2006-12-13 04:49:30 +0000299 def test_utime_dir(self):
300 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000301 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000302 # round to int, because some systems may support sub-second
303 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000304 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
305 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000306 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000307
Larry Hastings76ad59b2012-05-03 00:30:07 -0700308 def _test_utime(self, filename, attr, utime, delta):
Brian Curtin0277aa32011-11-06 13:50:15 -0600309 # Issue #13327 removed the requirement to pass None as the
Brian Curtin52fbea12011-11-06 13:41:17 -0600310 # second argument. Check that the previous methods of passing
311 # a time tuple or None work in addition to no argument.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700312 st0 = os.stat(filename)
Brian Curtin52fbea12011-11-06 13:41:17 -0600313 # Doesn't set anything new, but sets the time tuple way
Larry Hastings76ad59b2012-05-03 00:30:07 -0700314 utime(filename, (attr(st0, "st_atime"), attr(st0, "st_mtime")))
315 # Setting the time to the time you just read, then reading again,
316 # should always return exactly the same times.
317 st1 = os.stat(filename)
318 self.assertEqual(attr(st0, "st_mtime"), attr(st1, "st_mtime"))
319 self.assertEqual(attr(st0, "st_atime"), attr(st1, "st_atime"))
Brian Curtin52fbea12011-11-06 13:41:17 -0600320 # Set to the current time in the old explicit way.
Larry Hastings76ad59b2012-05-03 00:30:07 -0700321 os.utime(filename, None)
Brian Curtin52fbea12011-11-06 13:41:17 -0600322 st2 = os.stat(support.TESTFN)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700323 # Set to the current time in the new way
324 os.utime(filename)
325 st3 = os.stat(filename)
326 self.assertAlmostEqual(attr(st2, "st_mtime"), attr(st3, "st_mtime"), delta=delta)
327
328 def test_utime(self):
329 def utime(file, times):
330 return os.utime(file, times)
331 self._test_utime(self.fname, getattr, utime, 10)
332 self._test_utime(support.TESTFN, getattr, utime, 10)
333
334
335 def _test_utime_ns(self, set_times_ns, test_dir=True):
336 def getattr_ns(o, attr):
337 return getattr(o, attr + "_ns")
338 ten_s = 10 * 1000 * 1000 * 1000
339 self._test_utime(self.fname, getattr_ns, set_times_ns, ten_s)
340 if test_dir:
341 self._test_utime(support.TESTFN, getattr_ns, set_times_ns, ten_s)
342
343 def test_utime_ns(self):
344 def utime_ns(file, times):
345 return os.utime(file, ns=times)
346 self._test_utime_ns(utime_ns)
347
Larry Hastings9cf065c2012-06-22 16:30:09 -0700348 requires_utime_dir_fd = unittest.skipUnless(
349 os.utime in os.supports_dir_fd,
350 "dir_fd support for utime required for this test.")
351 requires_utime_fd = unittest.skipUnless(
352 os.utime in os.supports_fd,
353 "fd support for utime required for this test.")
354 requires_utime_nofollow_symlinks = unittest.skipUnless(
355 os.utime in os.supports_follow_symlinks,
356 "follow_symlinks support for utime required for this test.")
Larry Hastings76ad59b2012-05-03 00:30:07 -0700357
Larry Hastings9cf065c2012-06-22 16:30:09 -0700358 @requires_utime_nofollow_symlinks
Larry Hastings76ad59b2012-05-03 00:30:07 -0700359 def test_lutimes_ns(self):
360 def lutimes_ns(file, times):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700361 return os.utime(file, ns=times, follow_symlinks=False)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700362 self._test_utime_ns(lutimes_ns)
363
Larry Hastings9cf065c2012-06-22 16:30:09 -0700364 @requires_utime_fd
Larry Hastings76ad59b2012-05-03 00:30:07 -0700365 def test_futimes_ns(self):
366 def futimes_ns(file, times):
367 with open(file, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700368 os.utime(f.fileno(), ns=times)
Larry Hastings76ad59b2012-05-03 00:30:07 -0700369 self._test_utime_ns(futimes_ns, test_dir=False)
370
371 def _utime_invalid_arguments(self, name, arg):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700372 with self.assertRaises(ValueError):
Larry Hastings76ad59b2012-05-03 00:30:07 -0700373 getattr(os, name)(arg, (5, 5), ns=(5, 5))
374
375 def test_utime_invalid_arguments(self):
376 self._utime_invalid_arguments('utime', self.fname)
377
Brian Curtin52fbea12011-11-06 13:41:17 -0600378
Victor Stinner1aa54a42012-02-08 04:09:37 +0100379 @unittest.skipUnless(stat_supports_subsecond,
380 "os.stat() doesn't has a subsecond resolution")
Victor Stinnera2f7c002012-02-08 03:36:25 +0100381 def _test_utime_subsecond(self, set_time_func):
Victor Stinnerbe557de2012-02-08 03:01:11 +0100382 asec, amsec = 1, 901
383 atime = asec + amsec * 1e-3
Victor Stinnera2f7c002012-02-08 03:36:25 +0100384 msec, mmsec = 2, 901
Victor Stinnerbe557de2012-02-08 03:01:11 +0100385 mtime = msec + mmsec * 1e-3
386 filename = self.fname
Victor Stinnera2f7c002012-02-08 03:36:25 +0100387 os.utime(filename, (0, 0))
388 set_time_func(filename, atime, mtime)
Victor Stinner034d0aa2012-06-05 01:22:15 +0200389 with warnings.catch_warnings():
390 warnings.simplefilter("ignore", DeprecationWarning)
391 os.stat_float_times(True)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100392 st = os.stat(filename)
393 self.assertAlmostEqual(st.st_atime, atime, places=3)
394 self.assertAlmostEqual(st.st_mtime, mtime, places=3)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100395
Victor Stinnera2f7c002012-02-08 03:36:25 +0100396 def test_utime_subsecond(self):
397 def set_time(filename, atime, mtime):
398 os.utime(filename, (atime, mtime))
399 self._test_utime_subsecond(set_time)
400
Larry Hastings9cf065c2012-06-22 16:30:09 -0700401 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100402 def test_futimes_subsecond(self):
403 def set_time(filename, atime, mtime):
404 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700405 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100406 self._test_utime_subsecond(set_time)
407
Larry Hastings9cf065c2012-06-22 16:30:09 -0700408 @requires_utime_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100409 def test_futimens_subsecond(self):
410 def set_time(filename, atime, mtime):
411 with open(filename, "wb") as f:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700412 os.utime(f.fileno(), times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100413 self._test_utime_subsecond(set_time)
414
Larry Hastings9cf065c2012-06-22 16:30:09 -0700415 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100416 def test_futimesat_subsecond(self):
417 def set_time(filename, atime, mtime):
418 dirname = os.path.dirname(filename)
419 dirfd = os.open(dirname, os.O_RDONLY)
420 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700421 os.utime(os.path.basename(filename), dir_fd=dirfd,
422 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100423 finally:
424 os.close(dirfd)
425 self._test_utime_subsecond(set_time)
426
Larry Hastings9cf065c2012-06-22 16:30:09 -0700427 @requires_utime_nofollow_symlinks
Victor Stinnera2f7c002012-02-08 03:36:25 +0100428 def test_lutimes_subsecond(self):
429 def set_time(filename, atime, mtime):
Larry Hastings9cf065c2012-06-22 16:30:09 -0700430 os.utime(filename, (atime, mtime), follow_symlinks=False)
Victor Stinnera2f7c002012-02-08 03:36:25 +0100431 self._test_utime_subsecond(set_time)
432
Larry Hastings9cf065c2012-06-22 16:30:09 -0700433 @requires_utime_dir_fd
Victor Stinnera2f7c002012-02-08 03:36:25 +0100434 def test_utimensat_subsecond(self):
435 def set_time(filename, atime, mtime):
436 dirname = os.path.dirname(filename)
437 dirfd = os.open(dirname, os.O_RDONLY)
438 try:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700439 os.utime(os.path.basename(filename), dir_fd=dirfd,
440 times=(atime, mtime))
Victor Stinnera2f7c002012-02-08 03:36:25 +0100441 finally:
442 os.close(dirfd)
443 self._test_utime_subsecond(set_time)
Victor Stinnerbe557de2012-02-08 03:01:11 +0100444
Thomas Wouters89f507f2006-12-13 04:49:30 +0000445 # Restrict test to Win32, since there is no guarantee other
446 # systems support centiseconds
447 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000448 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000449 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000450 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000451 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000452 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000453 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000454 return buf.value
455
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000456 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000457 def test_1565150(self):
458 t1 = 1159195039.25
459 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000460 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000461
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000462 def test_large_time(self):
463 t1 = 5000000000 # some day in 2128
464 os.utime(self.fname, (t1, t1))
465 self.assertEqual(os.stat(self.fname).st_mtime, t1)
466
Guido van Rossumd8faa362007-04-27 19:54:29 +0000467 def test_1686475(self):
468 # Verify that an open file can be stat'ed
469 try:
470 os.stat(r"c:\pagefile.sys")
471 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000472 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000473 return
474 self.fail("Could not stat pagefile.sys")
475
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100476 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
477 def test_15261(self):
478 # Verify that stat'ing a closed fd does not cause crash
479 r, w = os.pipe()
480 try:
481 os.stat(r) # should not raise error
482 finally:
483 os.close(r)
484 os.close(w)
485 with self.assertRaises(OSError) as ctx:
486 os.stat(r)
487 self.assertEqual(ctx.exception.errno, errno.EBADF)
488
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000489from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000490
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000491class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000492 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000493 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000494
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000495 def setUp(self):
496 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000497 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000498 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000499 for key, value in self._reference().items():
500 os.environ[key] = value
501
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000502 def tearDown(self):
503 os.environ.clear()
504 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000505 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000506 os.environb.clear()
507 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000508
Christian Heimes90333392007-11-01 19:08:42 +0000509 def _reference(self):
510 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
511
512 def _empty_mapping(self):
513 os.environ.clear()
514 return os.environ
515
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000516 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000517 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000518 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000519 if os.path.exists("/bin/sh"):
520 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000521 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
522 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000523 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000524
Christian Heimes1a13d592007-11-08 14:16:55 +0000525 def test_os_popen_iter(self):
526 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000527 with os.popen(
528 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
529 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000530 self.assertEqual(next(it), "line1\n")
531 self.assertEqual(next(it), "line2\n")
532 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000533 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000534
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000535 # Verify environ keys and values from the OS are of the
536 # correct str type.
537 def test_keyvalue_types(self):
538 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000539 self.assertEqual(type(key), str)
540 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000541
Christian Heimes90333392007-11-01 19:08:42 +0000542 def test_items(self):
543 for key, value in self._reference().items():
544 self.assertEqual(os.environ.get(key), value)
545
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000546 # Issue 7310
547 def test___repr__(self):
548 """Check that the repr() of os.environ looks like environ({...})."""
549 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000550 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
551 '{!r}: {!r}'.format(key, value)
552 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000553
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000554 def test_get_exec_path(self):
555 defpath_list = os.defpath.split(os.pathsep)
556 test_path = ['/monty', '/python', '', '/flying/circus']
557 test_env = {'PATH': os.pathsep.join(test_path)}
558
559 saved_environ = os.environ
560 try:
561 os.environ = dict(test_env)
562 # Test that defaulting to os.environ works.
563 self.assertSequenceEqual(test_path, os.get_exec_path())
564 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
565 finally:
566 os.environ = saved_environ
567
568 # No PATH environment variable
569 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
570 # Empty PATH environment variable
571 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
572 # Supplied PATH environment variable
573 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
574
Victor Stinnerb745a742010-05-18 17:17:23 +0000575 if os.supports_bytes_environ:
576 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000577 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000578 # ignore BytesWarning warning
579 with warnings.catch_warnings(record=True):
580 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000581 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000582 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000583 pass
584 else:
585 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000586
587 # bytes key and/or value
588 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
589 ['abc'])
590 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
591 ['abc'])
592 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
593 ['abc'])
594
595 @unittest.skipUnless(os.supports_bytes_environ,
596 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000597 def test_environb(self):
598 # os.environ -> os.environb
599 value = 'euro\u20ac'
600 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000601 value_bytes = value.encode(sys.getfilesystemencoding(),
602 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000603 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000604 msg = "U+20AC character is not encodable to %s" % (
605 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000606 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000607 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000608 self.assertEqual(os.environ['unicode'], value)
609 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000610
611 # os.environb -> os.environ
612 value = b'\xff'
613 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000614 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000615 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000616 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000617
Charles-François Natali2966f102011-11-26 11:32:46 +0100618 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
619 # #13415).
620 @support.requires_freebsd_version(7)
621 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100622 def test_unset_error(self):
623 if sys.platform == "win32":
624 # an environment variable is limited to 32,767 characters
625 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100626 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100627 else:
628 # "=" is not allowed in a variable name
629 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100630 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100631
Tim Petersc4e09402003-04-25 07:11:48 +0000632class WalkTests(unittest.TestCase):
633 """Tests for os.walk()."""
634
Charles-François Natali7372b062012-02-05 15:15:38 +0100635 def setUp(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000636 import os
637 from os.path import join
638
639 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000640 # TESTFN/
641 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000642 # tmp1
643 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000644 # tmp2
645 # SUB11/ no kids
646 # SUB2/ a file kid and a dirsymlink kid
647 # tmp3
648 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200649 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000650 # TEST2/
651 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000652 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000653 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000654 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000655 sub2_path = join(walk_path, "SUB2")
656 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000657 tmp2_path = join(sub1_path, "tmp2")
658 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000659 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000660 t2_path = join(support.TESTFN, "TEST2")
661 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200662 link_path = join(sub2_path, "link")
663 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000664
665 # Create stuff.
666 os.makedirs(sub11_path)
667 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000668 os.makedirs(t2_path)
669 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000670 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000671 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
672 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000673 if support.can_symlink():
Antoine Pitrou5311c1d2012-01-24 08:59:28 +0100674 if os.name == 'nt':
675 def symlink_to_dir(src, dest):
676 os.symlink(src, dest, True)
677 else:
678 symlink_to_dir = os.symlink
679 symlink_to_dir(os.path.abspath(t2_path), link_path)
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200680 symlink_to_dir('broken', broken_link_path)
681 sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Guido van Rossumd8faa362007-04-27 19:54:29 +0000682 else:
683 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000684
685 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000686 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000687 self.assertEqual(len(all), 4)
688 # We can't know which order SUB1 and SUB2 will appear in.
689 # Not flipped: TESTFN, SUB1, SUB11, SUB2
690 # flipped: TESTFN, SUB2, SUB1, SUB11
691 flipped = all[0][1][0] != "SUB1"
692 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200693 all[3 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000694 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000695 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
696 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000697 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000698
699 # Prune the search.
700 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000701 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000702 all.append((root, dirs, files))
703 # Don't descend into SUB1.
704 if 'SUB1' in dirs:
705 # Note that this also mutates the dirs we appended to all!
706 dirs.remove('SUB1')
707 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000708 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200709 all[1][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000710 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000711
712 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000713 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000714 self.assertEqual(len(all), 4)
715 # We can't know which order SUB1 and SUB2 will appear in.
716 # Not flipped: SUB11, SUB1, SUB2, TESTFN
717 # flipped: SUB2, SUB11, SUB1, TESTFN
718 flipped = all[3][1][0] != "SUB1"
719 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200720 all[2 - 2 * flipped][-1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000721 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000722 self.assertEqual(all[flipped], (sub11_path, [], []))
723 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000724 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000725
Brian Curtin3b4499c2010-12-28 14:31:47 +0000726 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000727 # Walk, following symlinks.
728 for root, dirs, files in os.walk(walk_path, followlinks=True):
729 if root == link_path:
730 self.assertEqual(dirs, [])
731 self.assertEqual(files, ["tmp4"])
732 break
733 else:
734 self.fail("Didn't follow symlink with followlinks=True")
735
736 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000737 # Tear everything down. This is a decent use for bottom-up on
738 # Windows, which doesn't have a recursive delete command. The
739 # (not so) subtlety is that rmdir will fail unless the dir's
740 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000741 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000742 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000743 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000744 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000745 dirname = os.path.join(root, name)
746 if not os.path.islink(dirname):
747 os.rmdir(dirname)
748 else:
749 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000750 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000751
Charles-François Natali7372b062012-02-05 15:15:38 +0100752
753@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
754class FwalkTests(WalkTests):
755 """Tests for os.fwalk()."""
756
Larry Hastingsc48fe982012-06-25 04:49:05 -0700757 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
758 """
759 compare with walk() results.
760 """
Charles-François Natali7372b062012-02-05 15:15:38 +0100761 for topdown, followlinks in itertools.product((True, False), repeat=2):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700762 d = {'topdown': topdown, 'followlinks': followlinks}
763 walk_kwargs.update(d)
764 fwalk_kwargs.update(d)
765
Charles-François Natali7372b062012-02-05 15:15:38 +0100766 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700767 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100768 expected[root] = (set(dirs), set(files))
769
Larry Hastingsc48fe982012-06-25 04:49:05 -0700770 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100771 self.assertIn(root, expected)
772 self.assertEqual(expected[root], (set(dirs), set(files)))
773
Larry Hastingsc48fe982012-06-25 04:49:05 -0700774 def test_compare_to_walk(self):
775 kwargs = {'top': support.TESTFN}
776 self._compare_to_walk(kwargs, kwargs)
777
Charles-François Natali7372b062012-02-05 15:15:38 +0100778 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700779 try:
780 fd = os.open(".", os.O_RDONLY)
781 walk_kwargs = {'top': support.TESTFN}
782 fwalk_kwargs = walk_kwargs.copy()
783 fwalk_kwargs['dir_fd'] = fd
784 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
785 finally:
786 os.close(fd)
787
788 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100789 # check returned file descriptors
790 for topdown, followlinks in itertools.product((True, False), repeat=2):
791 args = support.TESTFN, topdown, None, followlinks
792 for root, dirs, files, rootfd in os.fwalk(*args):
793 # check that the FD is valid
794 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700795 # redundant check
796 os.stat(rootfd)
797 # check that listdir() returns consistent information
798 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100799
800 def test_fd_leak(self):
801 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
802 # we both check that calling fwalk() a large number of times doesn't
803 # yield EMFILE, and that the minimum allocated FD hasn't changed.
804 minfd = os.dup(1)
805 os.close(minfd)
806 for i in range(256):
807 for x in os.fwalk(support.TESTFN):
808 pass
809 newfd = os.dup(1)
810 self.addCleanup(os.close, newfd)
811 self.assertEqual(newfd, minfd)
812
813 def tearDown(self):
814 # cleanup
815 for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
816 for name in files:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700817 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100818 for name in dirs:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700819 st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
Larry Hastingsb698d8e2012-06-23 16:55:07 -0700820 if stat.S_ISDIR(st.st_mode):
821 os.rmdir(name, dir_fd=rootfd)
822 else:
823 os.unlink(name, dir_fd=rootfd)
Charles-François Natali7372b062012-02-05 15:15:38 +0100824 os.rmdir(support.TESTFN)
825
826
Guido van Rossume7ba4952007-06-06 23:52:48 +0000827class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000828 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000829 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000830
831 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000832 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000833 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
834 os.makedirs(path) # Should work
835 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
836 os.makedirs(path)
837
838 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000839 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000840 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
841 os.makedirs(path)
842 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
843 'dir5', 'dir6')
844 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000845
Terry Reedy5a22b652010-12-02 07:05:56 +0000846 def test_exist_ok_existing_directory(self):
847 path = os.path.join(support.TESTFN, 'dir1')
848 mode = 0o777
849 old_mask = os.umask(0o022)
850 os.makedirs(path, mode)
851 self.assertRaises(OSError, os.makedirs, path, mode)
852 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
853 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
854 os.makedirs(path, mode=mode, exist_ok=True)
855 os.umask(old_mask)
856
Gregory P. Smitha81c8562012-06-03 14:30:44 -0700857 def test_exist_ok_s_isgid_directory(self):
858 path = os.path.join(support.TESTFN, 'dir1')
859 S_ISGID = stat.S_ISGID
860 mode = 0o777
861 old_mask = os.umask(0o022)
862 try:
863 existing_testfn_mode = stat.S_IMODE(
864 os.lstat(support.TESTFN).st_mode)
865 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
866 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
867 raise unittest.SkipTest('No support for S_ISGID dir mode.')
868 # The os should apply S_ISGID from the parent dir for us, but
869 # this test need not depend on that behavior. Be explicit.
870 os.makedirs(path, mode | S_ISGID)
871 # http://bugs.python.org/issue14992
872 # Should not fail when the bit is already set.
873 os.makedirs(path, mode, exist_ok=True)
874 # remove the bit.
875 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
876 with self.assertRaises(OSError):
877 # Should fail when the bit is not already set when demanded.
878 os.makedirs(path, mode | S_ISGID, exist_ok=True)
879 finally:
880 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +0000881
882 def test_exist_ok_existing_regular_file(self):
883 base = support.TESTFN
884 path = os.path.join(support.TESTFN, 'dir1')
885 f = open(path, 'w')
886 f.write('abc')
887 f.close()
888 self.assertRaises(OSError, os.makedirs, path)
889 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
890 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
891 os.remove(path)
892
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000893 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000894 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000895 'dir4', 'dir5', 'dir6')
896 # If the tests failed, the bottom-most directory ('../dir6')
897 # may not have been created, so we look for the outermost directory
898 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000899 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000900 path = os.path.dirname(path)
901
902 os.removedirs(path)
903
Guido van Rossume7ba4952007-06-06 23:52:48 +0000904class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000905 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200906 with open(os.devnull, 'wb') as f:
907 f.write(b'hello')
908 f.close()
909 with open(os.devnull, 'rb') as f:
910 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000911
Guido van Rossume7ba4952007-06-06 23:52:48 +0000912class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +0100913 def test_urandom_length(self):
914 self.assertEqual(len(os.urandom(0)), 0)
915 self.assertEqual(len(os.urandom(1)), 1)
916 self.assertEqual(len(os.urandom(10)), 10)
917 self.assertEqual(len(os.urandom(100)), 100)
918 self.assertEqual(len(os.urandom(1000)), 1000)
919
920 def test_urandom_value(self):
921 data1 = os.urandom(16)
922 data2 = os.urandom(16)
923 self.assertNotEqual(data1, data2)
924
925 def get_urandom_subprocess(self, count):
926 code = '\n'.join((
927 'import os, sys',
928 'data = os.urandom(%s)' % count,
929 'sys.stdout.buffer.write(data)',
930 'sys.stdout.buffer.flush()'))
931 out = assert_python_ok('-c', code)
932 stdout = out[1]
933 self.assertEqual(len(stdout), 16)
934 return stdout
935
936 def test_urandom_subprocess(self):
937 data1 = self.get_urandom_subprocess(16)
938 data2 = self.get_urandom_subprocess(16)
939 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000940
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000941@contextlib.contextmanager
942def _execvpe_mockup(defpath=None):
943 """
944 Stubs out execv and execve functions when used as context manager.
945 Records exec calls. The mock execv and execve functions always raise an
946 exception as they would normally never return.
947 """
948 # A list of tuples containing (function name, first arg, args)
949 # of calls to execv or execve that have been made.
950 calls = []
951
952 def mock_execv(name, *args):
953 calls.append(('execv', name, args))
954 raise RuntimeError("execv called")
955
956 def mock_execve(name, *args):
957 calls.append(('execve', name, args))
958 raise OSError(errno.ENOTDIR, "execve called")
959
960 try:
961 orig_execv = os.execv
962 orig_execve = os.execve
963 orig_defpath = os.defpath
964 os.execv = mock_execv
965 os.execve = mock_execve
966 if defpath is not None:
967 os.defpath = defpath
968 yield calls
969 finally:
970 os.execv = orig_execv
971 os.execve = orig_execve
972 os.defpath = orig_defpath
973
Guido van Rossume7ba4952007-06-06 23:52:48 +0000974class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000975 @unittest.skipIf(USING_LINUXTHREADS,
976 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000977 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000978 self.assertRaises(OSError, os.execvpe, 'no such app-',
979 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000980
Thomas Heller6790d602007-08-30 17:15:14 +0000981 def test_execvpe_with_bad_arglist(self):
982 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
983
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000984 @unittest.skipUnless(hasattr(os, '_execvpe'),
985 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000986 def _test_internal_execvpe(self, test_type):
987 program_path = os.sep + 'absolutepath'
988 if test_type is bytes:
989 program = b'executable'
990 fullpath = os.path.join(os.fsencode(program_path), program)
991 native_fullpath = fullpath
992 arguments = [b'progname', 'arg1', 'arg2']
993 else:
994 program = 'executable'
995 arguments = ['progname', 'arg1', 'arg2']
996 fullpath = os.path.join(program_path, program)
997 if os.name != "nt":
998 native_fullpath = os.fsencode(fullpath)
999 else:
1000 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001001 env = {'spam': 'beans'}
1002
Victor Stinnerb745a742010-05-18 17:17:23 +00001003 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001004 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001005 self.assertRaises(RuntimeError,
1006 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001007 self.assertEqual(len(calls), 1)
1008 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1009
Victor Stinnerb745a742010-05-18 17:17:23 +00001010 # test os._execvpe() with a relative path:
1011 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001012 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001013 self.assertRaises(OSError,
1014 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001015 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001016 self.assertSequenceEqual(calls[0],
1017 ('execve', native_fullpath, (arguments, env)))
1018
1019 # test os._execvpe() with a relative path:
1020 # os.get_exec_path() reads the 'PATH' variable
1021 with _execvpe_mockup() as calls:
1022 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001023 if test_type is bytes:
1024 env_path[b'PATH'] = program_path
1025 else:
1026 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001027 self.assertRaises(OSError,
1028 os._execvpe, program, arguments, env=env_path)
1029 self.assertEqual(len(calls), 1)
1030 self.assertSequenceEqual(calls[0],
1031 ('execve', native_fullpath, (arguments, env_path)))
1032
1033 def test_internal_execvpe_str(self):
1034 self._test_internal_execvpe(str)
1035 if os.name != "nt":
1036 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001037
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001038
Thomas Wouters477c8d52006-05-27 19:21:47 +00001039class Win32ErrorTests(unittest.TestCase):
1040 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001041 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001042
1043 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001044 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001045
1046 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001047 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001048
1049 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001050 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +00001051 try:
1052 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
1053 finally:
1054 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +00001055 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001056
1057 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001058 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001059
Thomas Wouters477c8d52006-05-27 19:21:47 +00001060 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +00001061 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001062
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001063class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001064 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001065 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1066 #singles.append("close")
1067 #We omit close because it doesn'r raise an exception on some platforms
1068 def get_single(f):
1069 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001070 if hasattr(os, f):
1071 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001072 return helper
1073 for f in singles:
1074 locals()["test_"+f] = get_single(f)
1075
Benjamin Peterson7522c742009-01-19 21:00:09 +00001076 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001077 try:
1078 f(support.make_bad_fd(), *args)
1079 except OSError as e:
1080 self.assertEqual(e.errno, errno.EBADF)
1081 else:
1082 self.fail("%r didn't raise a OSError with a bad file descriptor"
1083 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001084
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001085 def test_isatty(self):
1086 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001087 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001088
1089 def test_closerange(self):
1090 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001091 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +00001092 # Make sure none of the descriptors we are about to close are
1093 # currently valid (issue 6542).
1094 for i in range(10):
1095 try: os.fstat(fd+i)
1096 except OSError:
1097 pass
1098 else:
1099 break
1100 if i < 2:
1101 raise unittest.SkipTest(
1102 "Unable to acquire a range of invalid file descriptors")
1103 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001104
1105 def test_dup2(self):
1106 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001107 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001108
1109 def test_fchmod(self):
1110 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001111 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001112
1113 def test_fchown(self):
1114 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001115 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001116
1117 def test_fpathconf(self):
1118 if hasattr(os, "fpathconf"):
Georg Brandl306336b2012-06-24 12:55:33 +02001119 self.check(os.pathconf, "PC_NAME_MAX")
Benjamin Peterson7522c742009-01-19 21:00:09 +00001120 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001121
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001122 def test_ftruncate(self):
1123 if hasattr(os, "ftruncate"):
Georg Brandl306336b2012-06-24 12:55:33 +02001124 self.check(os.truncate, 0)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001125 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001126
1127 def test_lseek(self):
1128 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001129 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001130
1131 def test_read(self):
1132 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001133 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001134
1135 def test_tcsetpgrpt(self):
1136 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001137 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001138
1139 def test_write(self):
1140 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001141 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001142
Brian Curtin1b9df392010-11-24 20:24:31 +00001143
1144class LinkTests(unittest.TestCase):
1145 def setUp(self):
1146 self.file1 = support.TESTFN
1147 self.file2 = os.path.join(support.TESTFN + "2")
1148
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001149 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001150 for file in (self.file1, self.file2):
1151 if os.path.exists(file):
1152 os.unlink(file)
1153
Brian Curtin1b9df392010-11-24 20:24:31 +00001154 def _test_link(self, file1, file2):
1155 with open(file1, "w") as f1:
1156 f1.write("test")
1157
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001158 with warnings.catch_warnings():
1159 warnings.simplefilter("ignore", DeprecationWarning)
1160 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001161 with open(file1, "r") as f1, open(file2, "r") as f2:
1162 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1163
1164 def test_link(self):
1165 self._test_link(self.file1, self.file2)
1166
1167 def test_link_bytes(self):
1168 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1169 bytes(self.file2, sys.getfilesystemencoding()))
1170
Brian Curtinf498b752010-11-30 15:54:04 +00001171 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001172 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001173 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001174 except UnicodeError:
1175 raise unittest.SkipTest("Unable to encode for this platform.")
1176
Brian Curtinf498b752010-11-30 15:54:04 +00001177 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001178 self.file2 = self.file1 + "2"
1179 self._test_link(self.file1, self.file2)
1180
Thomas Wouters477c8d52006-05-27 19:21:47 +00001181if sys.platform != 'win32':
1182 class Win32ErrorTests(unittest.TestCase):
1183 pass
1184
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001185 class PosixUidGidTests(unittest.TestCase):
1186 if hasattr(os, 'setuid'):
1187 def test_setuid(self):
1188 if os.getuid() != 0:
1189 self.assertRaises(os.error, os.setuid, 0)
1190 self.assertRaises(OverflowError, os.setuid, 1<<32)
1191
1192 if hasattr(os, 'setgid'):
1193 def test_setgid(self):
1194 if os.getuid() != 0:
1195 self.assertRaises(os.error, os.setgid, 0)
1196 self.assertRaises(OverflowError, os.setgid, 1<<32)
1197
1198 if hasattr(os, 'seteuid'):
1199 def test_seteuid(self):
1200 if os.getuid() != 0:
1201 self.assertRaises(os.error, os.seteuid, 0)
1202 self.assertRaises(OverflowError, os.seteuid, 1<<32)
1203
1204 if hasattr(os, 'setegid'):
1205 def test_setegid(self):
1206 if os.getuid() != 0:
1207 self.assertRaises(os.error, os.setegid, 0)
1208 self.assertRaises(OverflowError, os.setegid, 1<<32)
1209
1210 if hasattr(os, 'setreuid'):
1211 def test_setreuid(self):
1212 if os.getuid() != 0:
1213 self.assertRaises(os.error, os.setreuid, 0, 0)
1214 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1215 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001216
1217 def test_setreuid_neg1(self):
1218 # Needs to accept -1. We run this in a subprocess to avoid
1219 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001220 subprocess.check_call([
1221 sys.executable, '-c',
1222 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001223
1224 if hasattr(os, 'setregid'):
1225 def test_setregid(self):
1226 if os.getuid() != 0:
1227 self.assertRaises(os.error, os.setregid, 0, 0)
1228 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1229 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001230
1231 def test_setregid_neg1(self):
1232 # Needs to accept -1. We run this in a subprocess to avoid
1233 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001234 subprocess.check_call([
1235 sys.executable, '-c',
1236 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +00001237
1238 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +00001239 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001240 if support.TESTFN_UNENCODABLE:
1241 self.dir = support.TESTFN_UNENCODABLE
1242 else:
1243 self.dir = support.TESTFN
1244 self.bdir = os.fsencode(self.dir)
1245
1246 bytesfn = []
1247 def add_filename(fn):
1248 try:
1249 fn = os.fsencode(fn)
1250 except UnicodeEncodeError:
1251 return
1252 bytesfn.append(fn)
1253 add_filename(support.TESTFN_UNICODE)
1254 if support.TESTFN_UNENCODABLE:
1255 add_filename(support.TESTFN_UNENCODABLE)
1256 if not bytesfn:
1257 self.skipTest("couldn't create any non-ascii filename")
1258
1259 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +00001260 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001261 try:
1262 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +02001263 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +00001264 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001265 if fn in self.unicodefn:
1266 raise ValueError("duplicate filename")
1267 self.unicodefn.add(fn)
1268 except:
1269 shutil.rmtree(self.dir)
1270 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001271
1272 def tearDown(self):
1273 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001274
1275 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001276 expected = self.unicodefn
1277 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001278 self.assertEqual(found, expected)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001279 # test listdir without arguments
1280 current_directory = os.getcwd()
1281 try:
1282 os.chdir(os.sep)
1283 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1284 finally:
1285 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001286
1287 def test_open(self):
1288 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001289 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +00001290 f.close()
1291
1292 def test_stat(self):
1293 for fn in self.unicodefn:
1294 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001295else:
1296 class PosixUidGidTests(unittest.TestCase):
1297 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +00001298 class Pep383Tests(unittest.TestCase):
1299 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001300
Brian Curtineb24d742010-04-12 17:16:38 +00001301@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1302class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001303 def _kill(self, sig):
1304 # Start sys.executable as a subprocess and communicate from the
1305 # subprocess to the parent that the interpreter is ready. When it
1306 # becomes ready, send *sig* via os.kill to the subprocess and check
1307 # that the return code is equal to *sig*.
1308 import ctypes
1309 from ctypes import wintypes
1310 import msvcrt
1311
1312 # Since we can't access the contents of the process' stdout until the
1313 # process has exited, use PeekNamedPipe to see what's inside stdout
1314 # without waiting. This is done so we can tell that the interpreter
1315 # is started and running at a point where it could handle a signal.
1316 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1317 PeekNamedPipe.restype = wintypes.BOOL
1318 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1319 ctypes.POINTER(ctypes.c_char), # stdout buf
1320 wintypes.DWORD, # Buffer size
1321 ctypes.POINTER(wintypes.DWORD), # bytes read
1322 ctypes.POINTER(wintypes.DWORD), # bytes avail
1323 ctypes.POINTER(wintypes.DWORD)) # bytes left
1324 msg = "running"
1325 proc = subprocess.Popen([sys.executable, "-c",
1326 "import sys;"
1327 "sys.stdout.write('{}');"
1328 "sys.stdout.flush();"
1329 "input()".format(msg)],
1330 stdout=subprocess.PIPE,
1331 stderr=subprocess.PIPE,
1332 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001333 self.addCleanup(proc.stdout.close)
1334 self.addCleanup(proc.stderr.close)
1335 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001336
1337 count, max = 0, 100
1338 while count < max and proc.poll() is None:
1339 # Create a string buffer to store the result of stdout from the pipe
1340 buf = ctypes.create_string_buffer(len(msg))
1341 # Obtain the text currently in proc.stdout
1342 # Bytes read/avail/left are left as NULL and unused
1343 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1344 buf, ctypes.sizeof(buf), None, None, None)
1345 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1346 if buf.value:
1347 self.assertEqual(msg, buf.value.decode())
1348 break
1349 time.sleep(0.1)
1350 count += 1
1351 else:
1352 self.fail("Did not receive communication from the subprocess")
1353
Brian Curtineb24d742010-04-12 17:16:38 +00001354 os.kill(proc.pid, sig)
1355 self.assertEqual(proc.wait(), sig)
1356
1357 def test_kill_sigterm(self):
1358 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001359 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001360
1361 def test_kill_int(self):
1362 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001363 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001364
1365 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001366 tagname = "test_os_%s" % uuid.uuid1()
1367 m = mmap.mmap(-1, 1, tagname)
1368 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001369 # Run a script which has console control handling enabled.
1370 proc = subprocess.Popen([sys.executable,
1371 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001372 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001373 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1374 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001375 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001376 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001377 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001378 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001379 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001380 count += 1
1381 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001382 # Forcefully kill the process if we weren't able to signal it.
1383 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001384 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001385 os.kill(proc.pid, event)
1386 # proc.send_signal(event) could also be done here.
1387 # Allow time for the signal to be passed and the process to exit.
1388 time.sleep(0.5)
1389 if not proc.poll():
1390 # Forcefully kill the process if we weren't able to signal it.
1391 os.kill(proc.pid, signal.SIGINT)
1392 self.fail("subprocess did not stop on {}".format(name))
1393
1394 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1395 def test_CTRL_C_EVENT(self):
1396 from ctypes import wintypes
1397 import ctypes
1398
1399 # Make a NULL value by creating a pointer with no argument.
1400 NULL = ctypes.POINTER(ctypes.c_int)()
1401 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1402 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1403 wintypes.BOOL)
1404 SetConsoleCtrlHandler.restype = wintypes.BOOL
1405
1406 # Calling this with NULL and FALSE causes the calling process to
1407 # handle CTRL+C, rather than ignore it. This property is inherited
1408 # by subprocesses.
1409 SetConsoleCtrlHandler(NULL, 0)
1410
1411 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1412
1413 def test_CTRL_BREAK_EVENT(self):
1414 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1415
1416
Brian Curtind40e6f72010-07-08 21:39:08 +00001417@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001418@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001419class Win32SymlinkTests(unittest.TestCase):
1420 filelink = 'filelinktest'
1421 filelink_target = os.path.abspath(__file__)
1422 dirlink = 'dirlinktest'
1423 dirlink_target = os.path.dirname(filelink_target)
1424 missing_link = 'missing link'
1425
1426 def setUp(self):
1427 assert os.path.exists(self.dirlink_target)
1428 assert os.path.exists(self.filelink_target)
1429 assert not os.path.exists(self.dirlink)
1430 assert not os.path.exists(self.filelink)
1431 assert not os.path.exists(self.missing_link)
1432
1433 def tearDown(self):
1434 if os.path.exists(self.filelink):
1435 os.remove(self.filelink)
1436 if os.path.exists(self.dirlink):
1437 os.rmdir(self.dirlink)
1438 if os.path.lexists(self.missing_link):
1439 os.remove(self.missing_link)
1440
1441 def test_directory_link(self):
Antoine Pitrou5311c1d2012-01-24 08:59:28 +01001442 os.symlink(self.dirlink_target, self.dirlink, True)
Brian Curtind40e6f72010-07-08 21:39:08 +00001443 self.assertTrue(os.path.exists(self.dirlink))
1444 self.assertTrue(os.path.isdir(self.dirlink))
1445 self.assertTrue(os.path.islink(self.dirlink))
1446 self.check_stat(self.dirlink, self.dirlink_target)
1447
1448 def test_file_link(self):
1449 os.symlink(self.filelink_target, self.filelink)
1450 self.assertTrue(os.path.exists(self.filelink))
1451 self.assertTrue(os.path.isfile(self.filelink))
1452 self.assertTrue(os.path.islink(self.filelink))
1453 self.check_stat(self.filelink, self.filelink_target)
1454
1455 def _create_missing_dir_link(self):
1456 'Create a "directory" link to a non-existent target'
1457 linkname = self.missing_link
1458 if os.path.lexists(linkname):
1459 os.remove(linkname)
1460 target = r'c:\\target does not exist.29r3c740'
1461 assert not os.path.exists(target)
1462 target_is_dir = True
1463 os.symlink(target, linkname, target_is_dir)
1464
1465 def test_remove_directory_link_to_missing_target(self):
1466 self._create_missing_dir_link()
1467 # For compatibility with Unix, os.remove will check the
1468 # directory status and call RemoveDirectory if the symlink
1469 # was created with target_is_dir==True.
1470 os.remove(self.missing_link)
1471
1472 @unittest.skip("currently fails; consider for improvement")
1473 def test_isdir_on_directory_link_to_missing_target(self):
1474 self._create_missing_dir_link()
1475 # consider having isdir return true for directory links
1476 self.assertTrue(os.path.isdir(self.missing_link))
1477
1478 @unittest.skip("currently fails; consider for improvement")
1479 def test_rmdir_on_directory_link_to_missing_target(self):
1480 self._create_missing_dir_link()
1481 # consider allowing rmdir to remove directory links
1482 os.rmdir(self.missing_link)
1483
1484 def check_stat(self, link, target):
1485 self.assertEqual(os.stat(link), os.stat(target))
1486 self.assertNotEqual(os.lstat(link), os.stat(link))
1487
Brian Curtind25aef52011-06-13 15:16:04 -05001488 bytes_link = os.fsencode(link)
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001489 with warnings.catch_warnings():
1490 warnings.simplefilter("ignore", DeprecationWarning)
1491 self.assertEqual(os.stat(bytes_link), os.stat(target))
1492 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001493
1494 def test_12084(self):
1495 level1 = os.path.abspath(support.TESTFN)
1496 level2 = os.path.join(level1, "level2")
1497 level3 = os.path.join(level2, "level3")
1498 try:
1499 os.mkdir(level1)
1500 os.mkdir(level2)
1501 os.mkdir(level3)
1502
1503 file1 = os.path.abspath(os.path.join(level1, "file1"))
1504
1505 with open(file1, "w") as f:
1506 f.write("file1")
1507
1508 orig_dir = os.getcwd()
1509 try:
1510 os.chdir(level2)
1511 link = os.path.join(level2, "link")
1512 os.symlink(os.path.relpath(file1), "link")
1513 self.assertIn("link", os.listdir(os.getcwd()))
1514
1515 # Check os.stat calls from the same dir as the link
1516 self.assertEqual(os.stat(file1), os.stat("link"))
1517
1518 # Check os.stat calls from a dir below the link
1519 os.chdir(level1)
1520 self.assertEqual(os.stat(file1),
1521 os.stat(os.path.relpath(link)))
1522
1523 # Check os.stat calls from a dir above the link
1524 os.chdir(level3)
1525 self.assertEqual(os.stat(file1),
1526 os.stat(os.path.relpath(link)))
1527 finally:
1528 os.chdir(orig_dir)
1529 except OSError as err:
1530 self.fail(err)
1531 finally:
1532 os.remove(file1)
1533 shutil.rmtree(level1)
1534
Brian Curtind40e6f72010-07-08 21:39:08 +00001535
Victor Stinnere8d51452010-08-19 01:05:19 +00001536class FSEncodingTests(unittest.TestCase):
1537 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001538 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1539 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001540
Victor Stinnere8d51452010-08-19 01:05:19 +00001541 def test_identity(self):
1542 # assert fsdecode(fsencode(x)) == x
1543 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1544 try:
1545 bytesfn = os.fsencode(fn)
1546 except UnicodeEncodeError:
1547 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001548 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001549
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001550
Brett Cannonefb00c02012-02-29 18:31:31 -05001551
1552class DeviceEncodingTests(unittest.TestCase):
1553
1554 def test_bad_fd(self):
1555 # Return None when an fd doesn't actually exist.
1556 self.assertIsNone(os.device_encoding(123456))
1557
Philip Jenveye308b7c2012-02-29 16:16:15 -08001558 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
1559 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08001560 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05001561 def test_device_encoding(self):
1562 encoding = os.device_encoding(0)
1563 self.assertIsNotNone(encoding)
1564 self.assertTrue(codecs.lookup(encoding))
1565
1566
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001567class PidTests(unittest.TestCase):
1568 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1569 def test_getppid(self):
1570 p = subprocess.Popen([sys.executable, '-c',
1571 'import os; print(os.getppid())'],
1572 stdout=subprocess.PIPE)
1573 stdout, _ = p.communicate()
1574 # We are the parent of our subprocess
1575 self.assertEqual(int(stdout), os.getpid())
1576
1577
Brian Curtin0151b8e2010-09-24 13:43:43 +00001578# The introduction of this TestCase caused at least two different errors on
1579# *nix buildbots. Temporarily skip this to let the buildbots move along.
1580@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001581@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1582class LoginTests(unittest.TestCase):
1583 def test_getlogin(self):
1584 user_name = os.getlogin()
1585 self.assertNotEqual(len(user_name), 0)
1586
1587
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001588@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1589 "needs os.getpriority and os.setpriority")
1590class ProgramPriorityTests(unittest.TestCase):
1591 """Tests for os.getpriority() and os.setpriority()."""
1592
1593 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001594
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001595 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1596 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1597 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001598 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1599 if base >= 19 and new_prio <= 19:
1600 raise unittest.SkipTest(
1601 "unable to reliably test setpriority at current nice level of %s" % base)
1602 else:
1603 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001604 finally:
1605 try:
1606 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1607 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001608 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001609 raise
1610
1611
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001612if threading is not None:
1613 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001614
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001615 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001616
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001617 def __init__(self, conn):
1618 asynchat.async_chat.__init__(self, conn)
1619 self.in_buffer = []
1620 self.closed = False
1621 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001622
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001623 def handle_read(self):
1624 data = self.recv(4096)
1625 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001626
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001627 def get_data(self):
1628 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001629
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001630 def handle_close(self):
1631 self.close()
1632 self.closed = True
1633
1634 def handle_error(self):
1635 raise
1636
1637 def __init__(self, address):
1638 threading.Thread.__init__(self)
1639 asyncore.dispatcher.__init__(self)
1640 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1641 self.bind(address)
1642 self.listen(5)
1643 self.host, self.port = self.socket.getsockname()[:2]
1644 self.handler_instance = None
1645 self._active = False
1646 self._active_lock = threading.Lock()
1647
1648 # --- public API
1649
1650 @property
1651 def running(self):
1652 return self._active
1653
1654 def start(self):
1655 assert not self.running
1656 self.__flag = threading.Event()
1657 threading.Thread.start(self)
1658 self.__flag.wait()
1659
1660 def stop(self):
1661 assert self.running
1662 self._active = False
1663 self.join()
1664
1665 def wait(self):
1666 # wait for handler connection to be closed, then stop the server
1667 while not getattr(self.handler_instance, "closed", False):
1668 time.sleep(0.001)
1669 self.stop()
1670
1671 # --- internals
1672
1673 def run(self):
1674 self._active = True
1675 self.__flag.set()
1676 while self._active and asyncore.socket_map:
1677 self._active_lock.acquire()
1678 asyncore.loop(timeout=0.001, count=1)
1679 self._active_lock.release()
1680 asyncore.close_all()
1681
1682 def handle_accept(self):
1683 conn, addr = self.accept()
1684 self.handler_instance = self.Handler(conn)
1685
1686 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001687 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001688 handle_read = handle_connect
1689
1690 def writable(self):
1691 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001692
1693 def handle_error(self):
1694 raise
1695
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001696
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001697@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001698@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1699class TestSendfile(unittest.TestCase):
1700
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001701 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001702 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001703 not sys.platform.startswith("solaris") and \
1704 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001705
1706 @classmethod
1707 def setUpClass(cls):
1708 with open(support.TESTFN, "wb") as f:
1709 f.write(cls.DATA)
1710
1711 @classmethod
1712 def tearDownClass(cls):
1713 support.unlink(support.TESTFN)
1714
1715 def setUp(self):
1716 self.server = SendfileTestServer((support.HOST, 0))
1717 self.server.start()
1718 self.client = socket.socket()
1719 self.client.connect((self.server.host, self.server.port))
1720 self.client.settimeout(1)
1721 # synchronize by waiting for "220 ready" response
1722 self.client.recv(1024)
1723 self.sockno = self.client.fileno()
1724 self.file = open(support.TESTFN, 'rb')
1725 self.fileno = self.file.fileno()
1726
1727 def tearDown(self):
1728 self.file.close()
1729 self.client.close()
1730 if self.server.running:
1731 self.server.stop()
1732
1733 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1734 """A higher level wrapper representing how an application is
1735 supposed to use sendfile().
1736 """
1737 while 1:
1738 try:
1739 if self.SUPPORT_HEADERS_TRAILERS:
1740 return os.sendfile(sock, file, offset, nbytes, headers,
1741 trailers)
1742 else:
1743 return os.sendfile(sock, file, offset, nbytes)
1744 except OSError as err:
1745 if err.errno == errno.ECONNRESET:
1746 # disconnected
1747 raise
1748 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1749 # we have to retry send data
1750 continue
1751 else:
1752 raise
1753
1754 def test_send_whole_file(self):
1755 # normal send
1756 total_sent = 0
1757 offset = 0
1758 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001759 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001760 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1761 if sent == 0:
1762 break
1763 offset += sent
1764 total_sent += sent
1765 self.assertTrue(sent <= nbytes)
1766 self.assertEqual(offset, total_sent)
1767
1768 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001769 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001770 self.client.close()
1771 self.server.wait()
1772 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001773 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001774 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001775
1776 def test_send_at_certain_offset(self):
1777 # start sending a file at a certain offset
1778 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001779 offset = len(self.DATA) // 2
1780 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001781 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001782 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001783 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1784 if sent == 0:
1785 break
1786 offset += sent
1787 total_sent += sent
1788 self.assertTrue(sent <= nbytes)
1789
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001790 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001791 self.client.close()
1792 self.server.wait()
1793 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001794 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001795 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001796 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001797 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001798
1799 def test_offset_overflow(self):
1800 # specify an offset > file size
1801 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001802 try:
1803 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1804 except OSError as e:
1805 # Solaris can raise EINVAL if offset >= file length, ignore.
1806 if e.errno != errno.EINVAL:
1807 raise
1808 else:
1809 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001810 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001811 self.client.close()
1812 self.server.wait()
1813 data = self.server.handler_instance.get_data()
1814 self.assertEqual(data, b'')
1815
1816 def test_invalid_offset(self):
1817 with self.assertRaises(OSError) as cm:
1818 os.sendfile(self.sockno, self.fileno, -1, 4096)
1819 self.assertEqual(cm.exception.errno, errno.EINVAL)
1820
1821 # --- headers / trailers tests
1822
1823 if SUPPORT_HEADERS_TRAILERS:
1824
1825 def test_headers(self):
1826 total_sent = 0
1827 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1828 headers=[b"x" * 512])
1829 total_sent += sent
1830 offset = 4096
1831 nbytes = 4096
1832 while 1:
1833 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1834 offset, nbytes)
1835 if sent == 0:
1836 break
1837 total_sent += sent
1838 offset += sent
1839
1840 expected_data = b"x" * 512 + self.DATA
1841 self.assertEqual(total_sent, len(expected_data))
1842 self.client.close()
1843 self.server.wait()
1844 data = self.server.handler_instance.get_data()
1845 self.assertEqual(hash(data), hash(expected_data))
1846
1847 def test_trailers(self):
1848 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001849 with open(TESTFN2, 'wb') as f:
1850 f.write(b"abcde")
1851 with open(TESTFN2, 'rb')as f:
1852 self.addCleanup(os.remove, TESTFN2)
1853 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1854 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001855 self.client.close()
1856 self.server.wait()
1857 data = self.server.handler_instance.get_data()
1858 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001859
1860 if hasattr(os, "SF_NODISKIO"):
1861 def test_flags(self):
1862 try:
1863 os.sendfile(self.sockno, self.fileno, 0, 4096,
1864 flags=os.SF_NODISKIO)
1865 except OSError as err:
1866 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1867 raise
1868
1869
Larry Hastings9cf065c2012-06-22 16:30:09 -07001870def supports_extended_attributes():
1871 if not hasattr(os, "setxattr"):
1872 return False
1873 try:
1874 with open(support.TESTFN, "wb") as fp:
1875 try:
1876 os.setxattr(fp.fileno(), b"user.test", b"")
1877 except OSError:
1878 return False
1879 finally:
1880 support.unlink(support.TESTFN)
1881 # Kernels < 2.6.39 don't respect setxattr flags.
1882 kernel_version = platform.release()
1883 m = re.match("2.6.(\d{1,2})", kernel_version)
1884 return m is None or int(m.group(1)) >= 39
1885
1886
1887@unittest.skipUnless(supports_extended_attributes(),
1888 "no non-broken extended attribute support")
Benjamin Peterson799bd802011-08-31 22:15:17 -04001889class ExtendedAttributeTests(unittest.TestCase):
1890
1891 def tearDown(self):
1892 support.unlink(support.TESTFN)
1893
Larry Hastings9cf065c2012-06-22 16:30:09 -07001894 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04001895 fn = support.TESTFN
1896 open(fn, "wb").close()
1897 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001898 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001899 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001900 init_xattr = listxattr(fn)
1901 self.assertIsInstance(init_xattr, list)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001902 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001903 xattr = set(init_xattr)
1904 xattr.add("user.test")
1905 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001906 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
1907 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
1908 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Benjamin Peterson799bd802011-08-31 22:15:17 -04001909 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001910 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001911 self.assertEqual(cm.exception.errno, errno.EEXIST)
1912 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001913 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001914 self.assertEqual(cm.exception.errno, errno.ENODATA)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001915 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001916 xattr.add("user.test2")
1917 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001918 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001919 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001920 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001921 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001922 xattr.remove("user.test")
1923 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001924 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
1925 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
1926 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
1927 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001928 many = sorted("user.test{}".format(i) for i in range(100))
1929 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001930 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02001931 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04001932
Larry Hastings9cf065c2012-06-22 16:30:09 -07001933 def _check_xattrs(self, *args, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04001934 def make_bytes(s):
1935 return bytes(s, "ascii")
Larry Hastings9cf065c2012-06-22 16:30:09 -07001936 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001937 support.unlink(support.TESTFN)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001938 self._check_xattrs_str(make_bytes, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001939
1940 def test_simple(self):
1941 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1942 os.listxattr)
1943
1944 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07001945 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1946 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001947
1948 def test_fds(self):
1949 def getxattr(path, *args):
1950 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001951 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001952 def setxattr(path, *args):
1953 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001954 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001955 def removexattr(path, *args):
1956 with open(path, "wb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001957 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001958 def listxattr(path, *args):
1959 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07001960 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04001961 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1962
1963
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001964@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1965class Win32DeprecatedBytesAPI(unittest.TestCase):
1966 def test_deprecated(self):
1967 import nt
1968 filename = os.fsencode(support.TESTFN)
1969 with warnings.catch_warnings():
1970 warnings.simplefilter("error", DeprecationWarning)
1971 for func, *args in (
1972 (nt._getfullpathname, filename),
1973 (nt._isdir, filename),
1974 (os.access, filename, os.R_OK),
1975 (os.chdir, filename),
1976 (os.chmod, filename, 0o777),
Victor Stinnerf7c5ae22011-11-16 23:43:07 +01001977 (os.getcwdb,),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001978 (os.link, filename, filename),
1979 (os.listdir, filename),
1980 (os.lstat, filename),
1981 (os.mkdir, filename),
1982 (os.open, filename, os.O_RDONLY),
1983 (os.rename, filename, filename),
1984 (os.rmdir, filename),
1985 (os.startfile, filename),
1986 (os.stat, filename),
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001987 (os.unlink, filename),
1988 (os.utime, filename),
1989 ):
1990 self.assertRaises(DeprecationWarning, func, *args)
1991
Victor Stinner28216442011-11-16 00:34:44 +01001992 @support.skip_unless_symlink
1993 def test_symlink(self):
1994 filename = os.fsencode(support.TESTFN)
1995 with warnings.catch_warnings():
1996 warnings.simplefilter("error", DeprecationWarning)
1997 self.assertRaises(DeprecationWarning,
1998 os.symlink, filename, filename)
1999
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002000
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002001@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2002class TermsizeTests(unittest.TestCase):
2003 def test_does_not_crash(self):
2004 """Check if get_terminal_size() returns a meaningful value.
2005
2006 There's no easy portable way to actually check the size of the
2007 terminal, so let's check if it returns something sensible instead.
2008 """
2009 try:
2010 size = os.get_terminal_size()
2011 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002012 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002013 # Under win32 a generic OSError can be thrown if the
2014 # handle cannot be retrieved
2015 self.skipTest("failed to query terminal size")
2016 raise
2017
Antoine Pitroucfade362012-02-08 23:48:59 +01002018 self.assertGreaterEqual(size.columns, 0)
2019 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002020
2021 def test_stty_match(self):
2022 """Check if stty returns the same results
2023
2024 stty actually tests stdin, so get_terminal_size is invoked on
2025 stdin explicitly. If stty succeeded, then get_terminal_size()
2026 should work too.
2027 """
2028 try:
2029 size = subprocess.check_output(['stty', 'size']).decode().split()
2030 except (FileNotFoundError, subprocess.CalledProcessError):
2031 self.skipTest("stty invocation failed")
2032 expected = (int(size[1]), int(size[0])) # reversed order
2033
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002034 try:
2035 actual = os.get_terminal_size(sys.__stdin__.fileno())
2036 except OSError as e:
2037 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2038 # Under win32 a generic OSError can be thrown if the
2039 # handle cannot be retrieved
2040 self.skipTest("failed to query terminal size")
2041 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002042 self.assertEqual(expected, actual)
2043
2044
Antoine Pitrouf26ad712011-07-15 23:00:56 +02002045@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00002046def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002047 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00002048 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002049 StatAttributeTests,
2050 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002051 WalkTests,
Charles-François Natali7372b062012-02-05 15:15:38 +01002052 FwalkTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00002053 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00002054 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00002055 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00002056 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00002057 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00002058 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00002059 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00002060 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002061 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00002062 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00002063 FSEncodingTests,
Brett Cannonefb00c02012-02-29 18:31:31 -05002064 DeviceEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002065 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002066 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00002067 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002068 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002069 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04002070 ExtendedAttributeTests,
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002071 Win32DeprecatedBytesAPI,
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002072 TermsizeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00002073 )
Fred Drake2e2be372001-09-20 21:33:42 +00002074
2075if __name__ == "__main__":
2076 test_main()