blob: 569b2184a963fd73ce6f919d6b7d0d0383fd360c [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
Benjamin Peterson799bd802011-08-31 22:15:17 -040017import platform
18import re
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000019import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import asyncore
21import asynchat
22import socket
23try:
24 import threading
25except ImportError:
26 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000027
Mark Dickinson7cf03892010-04-16 13:45:35 +000028# Detect whether we're on a Linux system that uses the (now outdated
29# and unmaintained) linuxthreads threading library. There's an issue
30# when combining linuxthreads with a failed execv call: see
31# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020032if hasattr(sys, 'thread_info') and sys.thread_info.version:
33 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
34else:
35 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000036
Thomas Wouters0e3f5912006-08-11 14:57:12 +000037# Tests creating TESTFN
38class FileTests(unittest.TestCase):
39 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040 if os.path.exists(support.TESTFN):
41 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000042 tearDown = setUp
43
44 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000045 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000047 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000048
Christian Heimesfdab48e2008-01-20 09:06:41 +000049 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000050 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
51 # We must allocate two consecutive file descriptors, otherwise
52 # it will mess up other file descriptors (perhaps even the three
53 # standard ones).
54 second = os.dup(first)
55 try:
56 retries = 0
57 while second != first + 1:
58 os.close(first)
59 retries += 1
60 if retries > 10:
61 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000062 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000063 first, second = second, os.dup(second)
64 finally:
65 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000066 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000067 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000068 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000069
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000070 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000071 def test_rename(self):
72 path = support.TESTFN
73 old = sys.getrefcount(path)
74 self.assertRaises(TypeError, os.rename, path, 0)
75 new = sys.getrefcount(path)
76 self.assertEqual(old, new)
77
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000078 def test_read(self):
79 with open(support.TESTFN, "w+b") as fobj:
80 fobj.write(b"spam")
81 fobj.flush()
82 fd = fobj.fileno()
83 os.lseek(fd, 0, 0)
84 s = os.read(fd, 4)
85 self.assertEqual(type(s), bytes)
86 self.assertEqual(s, b"spam")
87
88 def test_write(self):
89 # os.write() accepts bytes- and buffer-like objects but not strings
90 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
91 self.assertRaises(TypeError, os.write, fd, "beans")
92 os.write(fd, b"bacon\n")
93 os.write(fd, bytearray(b"eggs\n"))
94 os.write(fd, memoryview(b"spam\n"))
95 os.close(fd)
96 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000097 self.assertEqual(fobj.read().splitlines(),
98 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000099
Victor Stinnere0daff12011-03-20 23:36:35 +0100100 def write_windows_console(self, *args):
101 retcode = subprocess.call(args,
102 # use a new console to not flood the test output
103 creationflags=subprocess.CREATE_NEW_CONSOLE,
104 # use a shell to hide the console window (SW_HIDE)
105 shell=True)
106 self.assertEqual(retcode, 0)
107
108 @unittest.skipUnless(sys.platform == 'win32',
109 'test specific to the Windows console')
110 def test_write_windows_console(self):
111 # Issue #11395: the Windows console returns an error (12: not enough
112 # space error) on writing into stdout if stdout mode is binary and the
113 # length is greater than 66,000 bytes (or less, depending on heap
114 # usage).
115 code = "print('x' * 100000)"
116 self.write_windows_console(sys.executable, "-c", code)
117 self.write_windows_console(sys.executable, "-u", "-c", code)
118
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000119 def fdopen_helper(self, *args):
120 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200121 f = os.fdopen(fd, *args)
122 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000123
124 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200125 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
126 os.close(fd)
127
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000128 self.fdopen_helper()
129 self.fdopen_helper('r')
130 self.fdopen_helper('r', 100)
131
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200132
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000133# Test attributes on return values from os.*stat* family.
134class StatAttributeTests(unittest.TestCase):
135 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000136 os.mkdir(support.TESTFN)
137 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000138 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000139 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000140 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000141
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000142 def tearDown(self):
143 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000144 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000145
Antoine Pitrou38425292010-09-21 18:19:07 +0000146 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000147 if not hasattr(os, "stat"):
148 return
149
150 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000151 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000152
153 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000154 self.assertEqual(result[stat.ST_SIZE], 3)
155 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000156
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000157 # Make sure all the attributes are there
158 members = dir(result)
159 for name in dir(stat):
160 if name[:3] == 'ST_':
161 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000162 if name.endswith("TIME"):
163 def trunc(x): return int(x)
164 else:
165 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000166 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000168 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000169
170 try:
171 result[200]
172 self.fail("No exception thrown")
173 except IndexError:
174 pass
175
176 # Make sure that assignment fails
177 try:
178 result.st_mode = 1
179 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000180 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000181 pass
182
183 try:
184 result.st_rdev = 1
185 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000186 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000187 pass
188
189 try:
190 result.parrot = 1
191 self.fail("No exception thrown")
192 except AttributeError:
193 pass
194
195 # Use the stat_result constructor with a too-short tuple.
196 try:
197 result2 = os.stat_result((10,))
198 self.fail("No exception thrown")
199 except TypeError:
200 pass
201
Ezio Melotti42da6632011-03-15 05:18:48 +0200202 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000203 try:
204 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
205 except TypeError:
206 pass
207
Antoine Pitrou38425292010-09-21 18:19:07 +0000208 def test_stat_attributes(self):
209 self.check_stat_attributes(self.fname)
210
211 def test_stat_attributes_bytes(self):
212 try:
213 fname = self.fname.encode(sys.getfilesystemencoding())
214 except UnicodeEncodeError:
215 self.skipTest("cannot encode %a for the filesystem" % self.fname)
216 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000217
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000218 def test_statvfs_attributes(self):
219 if not hasattr(os, "statvfs"):
220 return
221
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000222 try:
223 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000224 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000225 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000226 if e.errno == errno.ENOSYS:
227 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000228
229 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000230 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000231
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000232 # Make sure all the attributes are there.
233 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
234 'ffree', 'favail', 'flag', 'namemax')
235 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000236 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000237
238 # Make sure that assignment really fails
239 try:
240 result.f_bfree = 1
241 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000242 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000243 pass
244
245 try:
246 result.parrot = 1
247 self.fail("No exception thrown")
248 except AttributeError:
249 pass
250
251 # Use the constructor with a too-short tuple.
252 try:
253 result2 = os.statvfs_result((10,))
254 self.fail("No exception thrown")
255 except TypeError:
256 pass
257
Ezio Melotti42da6632011-03-15 05:18:48 +0200258 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259 try:
260 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
261 except TypeError:
262 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000263
Thomas Wouters89f507f2006-12-13 04:49:30 +0000264 def test_utime_dir(self):
265 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000266 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000267 # round to int, because some systems may support sub-second
268 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000269 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
270 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000271 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000272
273 # Restrict test to Win32, since there is no guarantee other
274 # systems support centiseconds
275 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000276 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000277 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000278 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000279 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000280 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000281 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000282 return buf.value
283
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000284 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000285 def test_1565150(self):
286 t1 = 1159195039.25
287 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000288 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000289
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000290 def test_large_time(self):
291 t1 = 5000000000 # some day in 2128
292 os.utime(self.fname, (t1, t1))
293 self.assertEqual(os.stat(self.fname).st_mtime, t1)
294
Guido van Rossumd8faa362007-04-27 19:54:29 +0000295 def test_1686475(self):
296 # Verify that an open file can be stat'ed
297 try:
298 os.stat(r"c:\pagefile.sys")
299 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000300 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000301 return
302 self.fail("Could not stat pagefile.sys")
303
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000304from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000305
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000306class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000307 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000308 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000309
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000310 def setUp(self):
311 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000312 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000313 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000314 for key, value in self._reference().items():
315 os.environ[key] = value
316
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000317 def tearDown(self):
318 os.environ.clear()
319 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000320 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000321 os.environb.clear()
322 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000323
Christian Heimes90333392007-11-01 19:08:42 +0000324 def _reference(self):
325 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
326
327 def _empty_mapping(self):
328 os.environ.clear()
329 return os.environ
330
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000331 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000332 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000333 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000334 if os.path.exists("/bin/sh"):
335 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000336 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
337 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000338 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000339
Christian Heimes1a13d592007-11-08 14:16:55 +0000340 def test_os_popen_iter(self):
341 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000342 with os.popen(
343 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
344 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000345 self.assertEqual(next(it), "line1\n")
346 self.assertEqual(next(it), "line2\n")
347 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000348 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000349
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000350 # Verify environ keys and values from the OS are of the
351 # correct str type.
352 def test_keyvalue_types(self):
353 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000354 self.assertEqual(type(key), str)
355 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000356
Christian Heimes90333392007-11-01 19:08:42 +0000357 def test_items(self):
358 for key, value in self._reference().items():
359 self.assertEqual(os.environ.get(key), value)
360
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000361 # Issue 7310
362 def test___repr__(self):
363 """Check that the repr() of os.environ looks like environ({...})."""
364 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000365 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
366 '{!r}: {!r}'.format(key, value)
367 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000368
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000369 def test_get_exec_path(self):
370 defpath_list = os.defpath.split(os.pathsep)
371 test_path = ['/monty', '/python', '', '/flying/circus']
372 test_env = {'PATH': os.pathsep.join(test_path)}
373
374 saved_environ = os.environ
375 try:
376 os.environ = dict(test_env)
377 # Test that defaulting to os.environ works.
378 self.assertSequenceEqual(test_path, os.get_exec_path())
379 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
380 finally:
381 os.environ = saved_environ
382
383 # No PATH environment variable
384 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
385 # Empty PATH environment variable
386 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
387 # Supplied PATH environment variable
388 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
389
Victor Stinnerb745a742010-05-18 17:17:23 +0000390 if os.supports_bytes_environ:
391 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000392 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000393 # ignore BytesWarning warning
394 with warnings.catch_warnings(record=True):
395 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000396 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000397 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000398 pass
399 else:
400 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000401
402 # bytes key and/or value
403 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
404 ['abc'])
405 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
406 ['abc'])
407 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
408 ['abc'])
409
410 @unittest.skipUnless(os.supports_bytes_environ,
411 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000412 def test_environb(self):
413 # os.environ -> os.environb
414 value = 'euro\u20ac'
415 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000416 value_bytes = value.encode(sys.getfilesystemencoding(),
417 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000418 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000419 msg = "U+20AC character is not encodable to %s" % (
420 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000421 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000422 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000423 self.assertEqual(os.environ['unicode'], value)
424 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000425
426 # os.environb -> os.environ
427 value = b'\xff'
428 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000429 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000430 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000431 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000432
Tim Petersc4e09402003-04-25 07:11:48 +0000433class WalkTests(unittest.TestCase):
434 """Tests for os.walk()."""
435
436 def test_traversal(self):
437 import os
438 from os.path import join
439
440 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000441 # TESTFN/
442 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000443 # tmp1
444 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000445 # tmp2
446 # SUB11/ no kids
447 # SUB2/ a file kid and a dirsymlink kid
448 # tmp3
449 # link/ a symlink to TESTFN.2
450 # TEST2/
451 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000452 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000453 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000454 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000455 sub2_path = join(walk_path, "SUB2")
456 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000457 tmp2_path = join(sub1_path, "tmp2")
458 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000459 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000460 t2_path = join(support.TESTFN, "TEST2")
461 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000462
463 # Create stuff.
464 os.makedirs(sub11_path)
465 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000466 os.makedirs(t2_path)
467 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000468 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000469 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
470 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000471 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000472 os.symlink(os.path.abspath(t2_path), link_path)
473 sub2_tree = (sub2_path, ["link"], ["tmp3"])
474 else:
475 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000476
477 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000478 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000479 self.assertEqual(len(all), 4)
480 # We can't know which order SUB1 and SUB2 will appear in.
481 # Not flipped: TESTFN, SUB1, SUB11, SUB2
482 # flipped: TESTFN, SUB2, SUB1, SUB11
483 flipped = all[0][1][0] != "SUB1"
484 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000485 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000486 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
487 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000488 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000489
490 # Prune the search.
491 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000492 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000493 all.append((root, dirs, files))
494 # Don't descend into SUB1.
495 if 'SUB1' in dirs:
496 # Note that this also mutates the dirs we appended to all!
497 dirs.remove('SUB1')
498 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000499 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
500 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000501
502 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000503 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000504 self.assertEqual(len(all), 4)
505 # We can't know which order SUB1 and SUB2 will appear in.
506 # Not flipped: SUB11, SUB1, SUB2, TESTFN
507 # flipped: SUB2, SUB11, SUB1, TESTFN
508 flipped = all[3][1][0] != "SUB1"
509 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000510 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000511 self.assertEqual(all[flipped], (sub11_path, [], []))
512 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000513 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000514
Brian Curtin3b4499c2010-12-28 14:31:47 +0000515 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000516 # Walk, following symlinks.
517 for root, dirs, files in os.walk(walk_path, followlinks=True):
518 if root == link_path:
519 self.assertEqual(dirs, [])
520 self.assertEqual(files, ["tmp4"])
521 break
522 else:
523 self.fail("Didn't follow symlink with followlinks=True")
524
525 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000526 # Tear everything down. This is a decent use for bottom-up on
527 # Windows, which doesn't have a recursive delete command. The
528 # (not so) subtlety is that rmdir will fail unless the dir's
529 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000530 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000531 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000532 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000533 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000534 dirname = os.path.join(root, name)
535 if not os.path.islink(dirname):
536 os.rmdir(dirname)
537 else:
538 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000539 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000540
Guido van Rossume7ba4952007-06-06 23:52:48 +0000541class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000542 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000543 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000544
545 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000546 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000547 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
548 os.makedirs(path) # Should work
549 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
550 os.makedirs(path)
551
552 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000553 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000554 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
555 os.makedirs(path)
556 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
557 'dir5', 'dir6')
558 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000559
Terry Reedy5a22b652010-12-02 07:05:56 +0000560 def test_exist_ok_existing_directory(self):
561 path = os.path.join(support.TESTFN, 'dir1')
562 mode = 0o777
563 old_mask = os.umask(0o022)
564 os.makedirs(path, mode)
565 self.assertRaises(OSError, os.makedirs, path, mode)
566 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
567 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
568 os.makedirs(path, mode=mode, exist_ok=True)
569 os.umask(old_mask)
570
571 def test_exist_ok_existing_regular_file(self):
572 base = support.TESTFN
573 path = os.path.join(support.TESTFN, 'dir1')
574 f = open(path, 'w')
575 f.write('abc')
576 f.close()
577 self.assertRaises(OSError, os.makedirs, path)
578 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
579 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
580 os.remove(path)
581
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000582 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000583 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000584 'dir4', 'dir5', 'dir6')
585 # If the tests failed, the bottom-most directory ('../dir6')
586 # may not have been created, so we look for the outermost directory
587 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000588 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000589 path = os.path.dirname(path)
590
591 os.removedirs(path)
592
Guido van Rossume7ba4952007-06-06 23:52:48 +0000593class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000594 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200595 with open(os.devnull, 'wb') as f:
596 f.write(b'hello')
597 f.close()
598 with open(os.devnull, 'rb') as f:
599 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000600
Guido van Rossume7ba4952007-06-06 23:52:48 +0000601class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000602 def test_urandom(self):
603 try:
604 self.assertEqual(len(os.urandom(1)), 1)
605 self.assertEqual(len(os.urandom(10)), 10)
606 self.assertEqual(len(os.urandom(100)), 100)
607 self.assertEqual(len(os.urandom(1000)), 1000)
608 except NotImplementedError:
609 pass
610
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000611@contextlib.contextmanager
612def _execvpe_mockup(defpath=None):
613 """
614 Stubs out execv and execve functions when used as context manager.
615 Records exec calls. The mock execv and execve functions always raise an
616 exception as they would normally never return.
617 """
618 # A list of tuples containing (function name, first arg, args)
619 # of calls to execv or execve that have been made.
620 calls = []
621
622 def mock_execv(name, *args):
623 calls.append(('execv', name, args))
624 raise RuntimeError("execv called")
625
626 def mock_execve(name, *args):
627 calls.append(('execve', name, args))
628 raise OSError(errno.ENOTDIR, "execve called")
629
630 try:
631 orig_execv = os.execv
632 orig_execve = os.execve
633 orig_defpath = os.defpath
634 os.execv = mock_execv
635 os.execve = mock_execve
636 if defpath is not None:
637 os.defpath = defpath
638 yield calls
639 finally:
640 os.execv = orig_execv
641 os.execve = orig_execve
642 os.defpath = orig_defpath
643
Guido van Rossume7ba4952007-06-06 23:52:48 +0000644class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000645 @unittest.skipIf(USING_LINUXTHREADS,
646 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000647 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000648 self.assertRaises(OSError, os.execvpe, 'no such app-',
649 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000650
Thomas Heller6790d602007-08-30 17:15:14 +0000651 def test_execvpe_with_bad_arglist(self):
652 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
653
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000654 @unittest.skipUnless(hasattr(os, '_execvpe'),
655 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000656 def _test_internal_execvpe(self, test_type):
657 program_path = os.sep + 'absolutepath'
658 if test_type is bytes:
659 program = b'executable'
660 fullpath = os.path.join(os.fsencode(program_path), program)
661 native_fullpath = fullpath
662 arguments = [b'progname', 'arg1', 'arg2']
663 else:
664 program = 'executable'
665 arguments = ['progname', 'arg1', 'arg2']
666 fullpath = os.path.join(program_path, program)
667 if os.name != "nt":
668 native_fullpath = os.fsencode(fullpath)
669 else:
670 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000671 env = {'spam': 'beans'}
672
Victor Stinnerb745a742010-05-18 17:17:23 +0000673 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000674 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000675 self.assertRaises(RuntimeError,
676 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000677 self.assertEqual(len(calls), 1)
678 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
679
Victor Stinnerb745a742010-05-18 17:17:23 +0000680 # test os._execvpe() with a relative path:
681 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000682 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000683 self.assertRaises(OSError,
684 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000685 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000686 self.assertSequenceEqual(calls[0],
687 ('execve', native_fullpath, (arguments, env)))
688
689 # test os._execvpe() with a relative path:
690 # os.get_exec_path() reads the 'PATH' variable
691 with _execvpe_mockup() as calls:
692 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000693 if test_type is bytes:
694 env_path[b'PATH'] = program_path
695 else:
696 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000697 self.assertRaises(OSError,
698 os._execvpe, program, arguments, env=env_path)
699 self.assertEqual(len(calls), 1)
700 self.assertSequenceEqual(calls[0],
701 ('execve', native_fullpath, (arguments, env_path)))
702
703 def test_internal_execvpe_str(self):
704 self._test_internal_execvpe(str)
705 if os.name != "nt":
706 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000707
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000708
Thomas Wouters477c8d52006-05-27 19:21:47 +0000709class Win32ErrorTests(unittest.TestCase):
710 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000711 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000712
713 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000714 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000715
716 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000717 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000718
719 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000720 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000721 try:
722 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
723 finally:
724 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000725 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000726
727 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000728 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000729
Thomas Wouters477c8d52006-05-27 19:21:47 +0000730 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000731 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000732
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000733class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000734 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000735 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
736 #singles.append("close")
737 #We omit close because it doesn'r raise an exception on some platforms
738 def get_single(f):
739 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000740 if hasattr(os, f):
741 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000742 return helper
743 for f in singles:
744 locals()["test_"+f] = get_single(f)
745
Benjamin Peterson7522c742009-01-19 21:00:09 +0000746 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000747 try:
748 f(support.make_bad_fd(), *args)
749 except OSError as e:
750 self.assertEqual(e.errno, errno.EBADF)
751 else:
752 self.fail("%r didn't raise a OSError with a bad file descriptor"
753 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000754
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000755 def test_isatty(self):
756 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000757 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000758
759 def test_closerange(self):
760 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000761 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000762 # Make sure none of the descriptors we are about to close are
763 # currently valid (issue 6542).
764 for i in range(10):
765 try: os.fstat(fd+i)
766 except OSError:
767 pass
768 else:
769 break
770 if i < 2:
771 raise unittest.SkipTest(
772 "Unable to acquire a range of invalid file descriptors")
773 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000774
775 def test_dup2(self):
776 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000777 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000778
779 def test_fchmod(self):
780 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000781 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000782
783 def test_fchown(self):
784 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000785 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000786
787 def test_fpathconf(self):
788 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000789 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000790
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000791 def test_ftruncate(self):
792 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000793 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000794
795 def test_lseek(self):
796 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000797 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000798
799 def test_read(self):
800 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000801 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000802
803 def test_tcsetpgrpt(self):
804 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000805 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000806
807 def test_write(self):
808 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000809 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000810
Brian Curtin1b9df392010-11-24 20:24:31 +0000811
812class LinkTests(unittest.TestCase):
813 def setUp(self):
814 self.file1 = support.TESTFN
815 self.file2 = os.path.join(support.TESTFN + "2")
816
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000817 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000818 for file in (self.file1, self.file2):
819 if os.path.exists(file):
820 os.unlink(file)
821
Brian Curtin1b9df392010-11-24 20:24:31 +0000822 def _test_link(self, file1, file2):
823 with open(file1, "w") as f1:
824 f1.write("test")
825
826 os.link(file1, file2)
827 with open(file1, "r") as f1, open(file2, "r") as f2:
828 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
829
830 def test_link(self):
831 self._test_link(self.file1, self.file2)
832
833 def test_link_bytes(self):
834 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
835 bytes(self.file2, sys.getfilesystemencoding()))
836
Brian Curtinf498b752010-11-30 15:54:04 +0000837 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000838 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000839 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000840 except UnicodeError:
841 raise unittest.SkipTest("Unable to encode for this platform.")
842
Brian Curtinf498b752010-11-30 15:54:04 +0000843 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000844 self.file2 = self.file1 + "2"
845 self._test_link(self.file1, self.file2)
846
Thomas Wouters477c8d52006-05-27 19:21:47 +0000847if sys.platform != 'win32':
848 class Win32ErrorTests(unittest.TestCase):
849 pass
850
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000851 class PosixUidGidTests(unittest.TestCase):
852 if hasattr(os, 'setuid'):
853 def test_setuid(self):
854 if os.getuid() != 0:
855 self.assertRaises(os.error, os.setuid, 0)
856 self.assertRaises(OverflowError, os.setuid, 1<<32)
857
858 if hasattr(os, 'setgid'):
859 def test_setgid(self):
860 if os.getuid() != 0:
861 self.assertRaises(os.error, os.setgid, 0)
862 self.assertRaises(OverflowError, os.setgid, 1<<32)
863
864 if hasattr(os, 'seteuid'):
865 def test_seteuid(self):
866 if os.getuid() != 0:
867 self.assertRaises(os.error, os.seteuid, 0)
868 self.assertRaises(OverflowError, os.seteuid, 1<<32)
869
870 if hasattr(os, 'setegid'):
871 def test_setegid(self):
872 if os.getuid() != 0:
873 self.assertRaises(os.error, os.setegid, 0)
874 self.assertRaises(OverflowError, os.setegid, 1<<32)
875
876 if hasattr(os, 'setreuid'):
877 def test_setreuid(self):
878 if os.getuid() != 0:
879 self.assertRaises(os.error, os.setreuid, 0, 0)
880 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
881 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000882
883 def test_setreuid_neg1(self):
884 # Needs to accept -1. We run this in a subprocess to avoid
885 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000886 subprocess.check_call([
887 sys.executable, '-c',
888 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000889
890 if hasattr(os, 'setregid'):
891 def test_setregid(self):
892 if os.getuid() != 0:
893 self.assertRaises(os.error, os.setregid, 0, 0)
894 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
895 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000896
897 def test_setregid_neg1(self):
898 # Needs to accept -1. We run this in a subprocess to avoid
899 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000900 subprocess.check_call([
901 sys.executable, '-c',
902 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +0000903
904 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +0000905 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000906 if support.TESTFN_UNENCODABLE:
907 self.dir = support.TESTFN_UNENCODABLE
908 else:
909 self.dir = support.TESTFN
910 self.bdir = os.fsencode(self.dir)
911
912 bytesfn = []
913 def add_filename(fn):
914 try:
915 fn = os.fsencode(fn)
916 except UnicodeEncodeError:
917 return
918 bytesfn.append(fn)
919 add_filename(support.TESTFN_UNICODE)
920 if support.TESTFN_UNENCODABLE:
921 add_filename(support.TESTFN_UNENCODABLE)
922 if not bytesfn:
923 self.skipTest("couldn't create any non-ascii filename")
924
925 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +0000926 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000927 try:
928 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +0200929 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +0000930 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000931 if fn in self.unicodefn:
932 raise ValueError("duplicate filename")
933 self.unicodefn.add(fn)
934 except:
935 shutil.rmtree(self.dir)
936 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +0000937
938 def tearDown(self):
939 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000940
941 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000942 expected = self.unicodefn
943 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000944 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000945
946 def test_open(self):
947 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +0200948 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +0000949 f.close()
950
951 def test_stat(self):
952 for fn in self.unicodefn:
953 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000954else:
955 class PosixUidGidTests(unittest.TestCase):
956 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +0000957 class Pep383Tests(unittest.TestCase):
958 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000959
Brian Curtineb24d742010-04-12 17:16:38 +0000960@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
961class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +0000962 def _kill(self, sig):
963 # Start sys.executable as a subprocess and communicate from the
964 # subprocess to the parent that the interpreter is ready. When it
965 # becomes ready, send *sig* via os.kill to the subprocess and check
966 # that the return code is equal to *sig*.
967 import ctypes
968 from ctypes import wintypes
969 import msvcrt
970
971 # Since we can't access the contents of the process' stdout until the
972 # process has exited, use PeekNamedPipe to see what's inside stdout
973 # without waiting. This is done so we can tell that the interpreter
974 # is started and running at a point where it could handle a signal.
975 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
976 PeekNamedPipe.restype = wintypes.BOOL
977 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
978 ctypes.POINTER(ctypes.c_char), # stdout buf
979 wintypes.DWORD, # Buffer size
980 ctypes.POINTER(wintypes.DWORD), # bytes read
981 ctypes.POINTER(wintypes.DWORD), # bytes avail
982 ctypes.POINTER(wintypes.DWORD)) # bytes left
983 msg = "running"
984 proc = subprocess.Popen([sys.executable, "-c",
985 "import sys;"
986 "sys.stdout.write('{}');"
987 "sys.stdout.flush();"
988 "input()".format(msg)],
989 stdout=subprocess.PIPE,
990 stderr=subprocess.PIPE,
991 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +0000992 self.addCleanup(proc.stdout.close)
993 self.addCleanup(proc.stderr.close)
994 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +0000995
996 count, max = 0, 100
997 while count < max and proc.poll() is None:
998 # Create a string buffer to store the result of stdout from the pipe
999 buf = ctypes.create_string_buffer(len(msg))
1000 # Obtain the text currently in proc.stdout
1001 # Bytes read/avail/left are left as NULL and unused
1002 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1003 buf, ctypes.sizeof(buf), None, None, None)
1004 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1005 if buf.value:
1006 self.assertEqual(msg, buf.value.decode())
1007 break
1008 time.sleep(0.1)
1009 count += 1
1010 else:
1011 self.fail("Did not receive communication from the subprocess")
1012
Brian Curtineb24d742010-04-12 17:16:38 +00001013 os.kill(proc.pid, sig)
1014 self.assertEqual(proc.wait(), sig)
1015
1016 def test_kill_sigterm(self):
1017 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001018 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001019
1020 def test_kill_int(self):
1021 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001022 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001023
1024 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001025 tagname = "test_os_%s" % uuid.uuid1()
1026 m = mmap.mmap(-1, 1, tagname)
1027 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001028 # Run a script which has console control handling enabled.
1029 proc = subprocess.Popen([sys.executable,
1030 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001031 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001032 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1033 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001034 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001035 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001036 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001037 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001038 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001039 count += 1
1040 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001041 # Forcefully kill the process if we weren't able to signal it.
1042 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001043 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001044 os.kill(proc.pid, event)
1045 # proc.send_signal(event) could also be done here.
1046 # Allow time for the signal to be passed and the process to exit.
1047 time.sleep(0.5)
1048 if not proc.poll():
1049 # Forcefully kill the process if we weren't able to signal it.
1050 os.kill(proc.pid, signal.SIGINT)
1051 self.fail("subprocess did not stop on {}".format(name))
1052
1053 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1054 def test_CTRL_C_EVENT(self):
1055 from ctypes import wintypes
1056 import ctypes
1057
1058 # Make a NULL value by creating a pointer with no argument.
1059 NULL = ctypes.POINTER(ctypes.c_int)()
1060 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1061 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1062 wintypes.BOOL)
1063 SetConsoleCtrlHandler.restype = wintypes.BOOL
1064
1065 # Calling this with NULL and FALSE causes the calling process to
1066 # handle CTRL+C, rather than ignore it. This property is inherited
1067 # by subprocesses.
1068 SetConsoleCtrlHandler(NULL, 0)
1069
1070 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1071
1072 def test_CTRL_BREAK_EVENT(self):
1073 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1074
1075
Brian Curtind40e6f72010-07-08 21:39:08 +00001076@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001077@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001078class Win32SymlinkTests(unittest.TestCase):
1079 filelink = 'filelinktest'
1080 filelink_target = os.path.abspath(__file__)
1081 dirlink = 'dirlinktest'
1082 dirlink_target = os.path.dirname(filelink_target)
1083 missing_link = 'missing link'
1084
1085 def setUp(self):
1086 assert os.path.exists(self.dirlink_target)
1087 assert os.path.exists(self.filelink_target)
1088 assert not os.path.exists(self.dirlink)
1089 assert not os.path.exists(self.filelink)
1090 assert not os.path.exists(self.missing_link)
1091
1092 def tearDown(self):
1093 if os.path.exists(self.filelink):
1094 os.remove(self.filelink)
1095 if os.path.exists(self.dirlink):
1096 os.rmdir(self.dirlink)
1097 if os.path.lexists(self.missing_link):
1098 os.remove(self.missing_link)
1099
1100 def test_directory_link(self):
1101 os.symlink(self.dirlink_target, self.dirlink)
1102 self.assertTrue(os.path.exists(self.dirlink))
1103 self.assertTrue(os.path.isdir(self.dirlink))
1104 self.assertTrue(os.path.islink(self.dirlink))
1105 self.check_stat(self.dirlink, self.dirlink_target)
1106
1107 def test_file_link(self):
1108 os.symlink(self.filelink_target, self.filelink)
1109 self.assertTrue(os.path.exists(self.filelink))
1110 self.assertTrue(os.path.isfile(self.filelink))
1111 self.assertTrue(os.path.islink(self.filelink))
1112 self.check_stat(self.filelink, self.filelink_target)
1113
1114 def _create_missing_dir_link(self):
1115 'Create a "directory" link to a non-existent target'
1116 linkname = self.missing_link
1117 if os.path.lexists(linkname):
1118 os.remove(linkname)
1119 target = r'c:\\target does not exist.29r3c740'
1120 assert not os.path.exists(target)
1121 target_is_dir = True
1122 os.symlink(target, linkname, target_is_dir)
1123
1124 def test_remove_directory_link_to_missing_target(self):
1125 self._create_missing_dir_link()
1126 # For compatibility with Unix, os.remove will check the
1127 # directory status and call RemoveDirectory if the symlink
1128 # was created with target_is_dir==True.
1129 os.remove(self.missing_link)
1130
1131 @unittest.skip("currently fails; consider for improvement")
1132 def test_isdir_on_directory_link_to_missing_target(self):
1133 self._create_missing_dir_link()
1134 # consider having isdir return true for directory links
1135 self.assertTrue(os.path.isdir(self.missing_link))
1136
1137 @unittest.skip("currently fails; consider for improvement")
1138 def test_rmdir_on_directory_link_to_missing_target(self):
1139 self._create_missing_dir_link()
1140 # consider allowing rmdir to remove directory links
1141 os.rmdir(self.missing_link)
1142
1143 def check_stat(self, link, target):
1144 self.assertEqual(os.stat(link), os.stat(target))
1145 self.assertNotEqual(os.lstat(link), os.stat(link))
1146
Brian Curtind25aef52011-06-13 15:16:04 -05001147 bytes_link = os.fsencode(link)
1148 self.assertEqual(os.stat(bytes_link), os.stat(target))
1149 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
1150
1151 def test_12084(self):
1152 level1 = os.path.abspath(support.TESTFN)
1153 level2 = os.path.join(level1, "level2")
1154 level3 = os.path.join(level2, "level3")
1155 try:
1156 os.mkdir(level1)
1157 os.mkdir(level2)
1158 os.mkdir(level3)
1159
1160 file1 = os.path.abspath(os.path.join(level1, "file1"))
1161
1162 with open(file1, "w") as f:
1163 f.write("file1")
1164
1165 orig_dir = os.getcwd()
1166 try:
1167 os.chdir(level2)
1168 link = os.path.join(level2, "link")
1169 os.symlink(os.path.relpath(file1), "link")
1170 self.assertIn("link", os.listdir(os.getcwd()))
1171
1172 # Check os.stat calls from the same dir as the link
1173 self.assertEqual(os.stat(file1), os.stat("link"))
1174
1175 # Check os.stat calls from a dir below the link
1176 os.chdir(level1)
1177 self.assertEqual(os.stat(file1),
1178 os.stat(os.path.relpath(link)))
1179
1180 # Check os.stat calls from a dir above the link
1181 os.chdir(level3)
1182 self.assertEqual(os.stat(file1),
1183 os.stat(os.path.relpath(link)))
1184 finally:
1185 os.chdir(orig_dir)
1186 except OSError as err:
1187 self.fail(err)
1188 finally:
1189 os.remove(file1)
1190 shutil.rmtree(level1)
1191
Brian Curtind40e6f72010-07-08 21:39:08 +00001192
Victor Stinnere8d51452010-08-19 01:05:19 +00001193class FSEncodingTests(unittest.TestCase):
1194 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001195 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1196 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001197
Victor Stinnere8d51452010-08-19 01:05:19 +00001198 def test_identity(self):
1199 # assert fsdecode(fsencode(x)) == x
1200 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1201 try:
1202 bytesfn = os.fsencode(fn)
1203 except UnicodeEncodeError:
1204 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001205 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001206
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001207
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001208class PidTests(unittest.TestCase):
1209 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1210 def test_getppid(self):
1211 p = subprocess.Popen([sys.executable, '-c',
1212 'import os; print(os.getppid())'],
1213 stdout=subprocess.PIPE)
1214 stdout, _ = p.communicate()
1215 # We are the parent of our subprocess
1216 self.assertEqual(int(stdout), os.getpid())
1217
1218
Brian Curtin0151b8e2010-09-24 13:43:43 +00001219# The introduction of this TestCase caused at least two different errors on
1220# *nix buildbots. Temporarily skip this to let the buildbots move along.
1221@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001222@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1223class LoginTests(unittest.TestCase):
1224 def test_getlogin(self):
1225 user_name = os.getlogin()
1226 self.assertNotEqual(len(user_name), 0)
1227
1228
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001229@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1230 "needs os.getpriority and os.setpriority")
1231class ProgramPriorityTests(unittest.TestCase):
1232 """Tests for os.getpriority() and os.setpriority()."""
1233
1234 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001235
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001236 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1237 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1238 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001239 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1240 if base >= 19 and new_prio <= 19:
1241 raise unittest.SkipTest(
1242 "unable to reliably test setpriority at current nice level of %s" % base)
1243 else:
1244 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001245 finally:
1246 try:
1247 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1248 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001249 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001250 raise
1251
1252
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001253if threading is not None:
1254 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001255
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001256 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001257
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001258 def __init__(self, conn):
1259 asynchat.async_chat.__init__(self, conn)
1260 self.in_buffer = []
1261 self.closed = False
1262 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001263
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001264 def handle_read(self):
1265 data = self.recv(4096)
1266 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001267
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001268 def get_data(self):
1269 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001270
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001271 def handle_close(self):
1272 self.close()
1273 self.closed = True
1274
1275 def handle_error(self):
1276 raise
1277
1278 def __init__(self, address):
1279 threading.Thread.__init__(self)
1280 asyncore.dispatcher.__init__(self)
1281 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1282 self.bind(address)
1283 self.listen(5)
1284 self.host, self.port = self.socket.getsockname()[:2]
1285 self.handler_instance = None
1286 self._active = False
1287 self._active_lock = threading.Lock()
1288
1289 # --- public API
1290
1291 @property
1292 def running(self):
1293 return self._active
1294
1295 def start(self):
1296 assert not self.running
1297 self.__flag = threading.Event()
1298 threading.Thread.start(self)
1299 self.__flag.wait()
1300
1301 def stop(self):
1302 assert self.running
1303 self._active = False
1304 self.join()
1305
1306 def wait(self):
1307 # wait for handler connection to be closed, then stop the server
1308 while not getattr(self.handler_instance, "closed", False):
1309 time.sleep(0.001)
1310 self.stop()
1311
1312 # --- internals
1313
1314 def run(self):
1315 self._active = True
1316 self.__flag.set()
1317 while self._active and asyncore.socket_map:
1318 self._active_lock.acquire()
1319 asyncore.loop(timeout=0.001, count=1)
1320 self._active_lock.release()
1321 asyncore.close_all()
1322
1323 def handle_accept(self):
1324 conn, addr = self.accept()
1325 self.handler_instance = self.Handler(conn)
1326
1327 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001328 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001329 handle_read = handle_connect
1330
1331 def writable(self):
1332 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001333
1334 def handle_error(self):
1335 raise
1336
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001337
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001338@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001339@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1340class TestSendfile(unittest.TestCase):
1341
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001342 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001343 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001344 not sys.platform.startswith("solaris") and \
1345 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001346
1347 @classmethod
1348 def setUpClass(cls):
1349 with open(support.TESTFN, "wb") as f:
1350 f.write(cls.DATA)
1351
1352 @classmethod
1353 def tearDownClass(cls):
1354 support.unlink(support.TESTFN)
1355
1356 def setUp(self):
1357 self.server = SendfileTestServer((support.HOST, 0))
1358 self.server.start()
1359 self.client = socket.socket()
1360 self.client.connect((self.server.host, self.server.port))
1361 self.client.settimeout(1)
1362 # synchronize by waiting for "220 ready" response
1363 self.client.recv(1024)
1364 self.sockno = self.client.fileno()
1365 self.file = open(support.TESTFN, 'rb')
1366 self.fileno = self.file.fileno()
1367
1368 def tearDown(self):
1369 self.file.close()
1370 self.client.close()
1371 if self.server.running:
1372 self.server.stop()
1373
1374 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1375 """A higher level wrapper representing how an application is
1376 supposed to use sendfile().
1377 """
1378 while 1:
1379 try:
1380 if self.SUPPORT_HEADERS_TRAILERS:
1381 return os.sendfile(sock, file, offset, nbytes, headers,
1382 trailers)
1383 else:
1384 return os.sendfile(sock, file, offset, nbytes)
1385 except OSError as err:
1386 if err.errno == errno.ECONNRESET:
1387 # disconnected
1388 raise
1389 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1390 # we have to retry send data
1391 continue
1392 else:
1393 raise
1394
1395 def test_send_whole_file(self):
1396 # normal send
1397 total_sent = 0
1398 offset = 0
1399 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001400 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001401 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1402 if sent == 0:
1403 break
1404 offset += sent
1405 total_sent += sent
1406 self.assertTrue(sent <= nbytes)
1407 self.assertEqual(offset, total_sent)
1408
1409 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001410 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001411 self.client.close()
1412 self.server.wait()
1413 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001414 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001415 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001416
1417 def test_send_at_certain_offset(self):
1418 # start sending a file at a certain offset
1419 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001420 offset = len(self.DATA) // 2
1421 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001422 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001423 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001424 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1425 if sent == 0:
1426 break
1427 offset += sent
1428 total_sent += sent
1429 self.assertTrue(sent <= nbytes)
1430
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001431 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001432 self.client.close()
1433 self.server.wait()
1434 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001435 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001436 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001437 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001438 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001439
1440 def test_offset_overflow(self):
1441 # specify an offset > file size
1442 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001443 try:
1444 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1445 except OSError as e:
1446 # Solaris can raise EINVAL if offset >= file length, ignore.
1447 if e.errno != errno.EINVAL:
1448 raise
1449 else:
1450 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001451 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001452 self.client.close()
1453 self.server.wait()
1454 data = self.server.handler_instance.get_data()
1455 self.assertEqual(data, b'')
1456
1457 def test_invalid_offset(self):
1458 with self.assertRaises(OSError) as cm:
1459 os.sendfile(self.sockno, self.fileno, -1, 4096)
1460 self.assertEqual(cm.exception.errno, errno.EINVAL)
1461
1462 # --- headers / trailers tests
1463
1464 if SUPPORT_HEADERS_TRAILERS:
1465
1466 def test_headers(self):
1467 total_sent = 0
1468 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1469 headers=[b"x" * 512])
1470 total_sent += sent
1471 offset = 4096
1472 nbytes = 4096
1473 while 1:
1474 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1475 offset, nbytes)
1476 if sent == 0:
1477 break
1478 total_sent += sent
1479 offset += sent
1480
1481 expected_data = b"x" * 512 + self.DATA
1482 self.assertEqual(total_sent, len(expected_data))
1483 self.client.close()
1484 self.server.wait()
1485 data = self.server.handler_instance.get_data()
1486 self.assertEqual(hash(data), hash(expected_data))
1487
1488 def test_trailers(self):
1489 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001490 with open(TESTFN2, 'wb') as f:
1491 f.write(b"abcde")
1492 with open(TESTFN2, 'rb')as f:
1493 self.addCleanup(os.remove, TESTFN2)
1494 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1495 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001496 self.client.close()
1497 self.server.wait()
1498 data = self.server.handler_instance.get_data()
1499 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001500
1501 if hasattr(os, "SF_NODISKIO"):
1502 def test_flags(self):
1503 try:
1504 os.sendfile(self.sockno, self.fileno, 0, 4096,
1505 flags=os.SF_NODISKIO)
1506 except OSError as err:
1507 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1508 raise
1509
1510
Benjamin Peterson799bd802011-08-31 22:15:17 -04001511def supports_extended_attributes():
1512 if not hasattr(os, "setxattr"):
1513 return False
1514 try:
1515 with open(support.TESTFN, "wb") as fp:
1516 try:
1517 os.fsetxattr(fp.fileno(), b"user.test", b"")
1518 except OSError as e:
1519 if e.errno != errno.ENOTSUP:
1520 raise
1521 return False
1522 finally:
1523 support.unlink(support.TESTFN)
1524 # Kernels < 2.6.39 don't respect setxattr flags.
1525 kernel_version = platform.release()
1526 m = re.match("2.6.(\d{1,2})", kernel_version)
1527 return m is None or int(m.group(1)) >= 39
1528
1529
1530@unittest.skipUnless(supports_extended_attributes(),
1531 "no non-broken extended attribute support")
1532class ExtendedAttributeTests(unittest.TestCase):
1533
1534 def tearDown(self):
1535 support.unlink(support.TESTFN)
1536
1537 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr):
1538 fn = support.TESTFN
1539 open(fn, "wb").close()
1540 with self.assertRaises(OSError) as cm:
1541 getxattr(fn, s("user.test"))
1542 self.assertEqual(cm.exception.errno, errno.ENODATA)
1543 self.assertEqual(listxattr(fn), [])
1544 setxattr(fn, s("user.test"), b"")
1545 self.assertEqual(listxattr(fn), ["user.test"])
1546 self.assertEqual(getxattr(fn, b"user.test"), b"")
1547 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE)
1548 self.assertEqual(getxattr(fn, b"user.test"), b"hello")
1549 with self.assertRaises(OSError) as cm:
1550 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE)
1551 self.assertEqual(cm.exception.errno, errno.EEXIST)
1552 with self.assertRaises(OSError) as cm:
1553 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE)
1554 self.assertEqual(cm.exception.errno, errno.ENODATA)
1555 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE)
1556 self.assertEqual(sorted(listxattr(fn)), ["user.test", "user.test2"])
1557 removexattr(fn, s("user.test"))
1558 with self.assertRaises(OSError) as cm:
1559 getxattr(fn, s("user.test"))
1560 self.assertEqual(cm.exception.errno, errno.ENODATA)
1561 self.assertEqual(listxattr(fn), ["user.test2"])
1562 self.assertEqual(getxattr(fn, s("user.test2")), b"foo")
1563 setxattr(fn, s("user.test"), b"a"*1024)
1564 self.assertEqual(getxattr(fn, s("user.test")), b"a"*1024)
1565 removexattr(fn, s("user.test"))
1566 many = sorted("user.test{}".format(i) for i in range(100))
1567 for thing in many:
1568 setxattr(fn, thing, b"x")
1569 self.assertEqual(sorted(listxattr(fn)), many)
1570
1571 def _check_xattrs(self, *args):
1572 def make_bytes(s):
1573 return bytes(s, "ascii")
1574 self._check_xattrs_str(str, *args)
1575 support.unlink(support.TESTFN)
1576 self._check_xattrs_str(make_bytes, *args)
1577
1578 def test_simple(self):
1579 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
1580 os.listxattr)
1581
1582 def test_lpath(self):
1583 self._check_xattrs(os.lgetxattr, os.lsetxattr, os.lremovexattr,
1584 os.llistxattr)
1585
1586 def test_fds(self):
1587 def getxattr(path, *args):
1588 with open(path, "rb") as fp:
1589 return os.fgetxattr(fp.fileno(), *args)
1590 def setxattr(path, *args):
1591 with open(path, "wb") as fp:
1592 os.fsetxattr(fp.fileno(), *args)
1593 def removexattr(path, *args):
1594 with open(path, "wb") as fp:
1595 os.fremovexattr(fp.fileno(), *args)
1596 def listxattr(path, *args):
1597 with open(path, "rb") as fp:
1598 return os.flistxattr(fp.fileno(), *args)
1599 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
1600
1601
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001602@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001603def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001604 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001605 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001606 StatAttributeTests,
1607 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001608 WalkTests,
1609 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001610 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001611 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001612 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001613 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001614 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001615 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001616 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001617 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001618 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001619 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001620 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001621 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001622 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001623 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001624 ProgramPriorityTests,
Benjamin Peterson799bd802011-08-31 22:15:17 -04001625 ExtendedAttributeTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001626 )
Fred Drake2e2be372001-09-20 21:33:42 +00001627
1628if __name__ == "__main__":
1629 test_main()