blob: 1d5f11cec1cb3414b3887e09faba3e77698954a8 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
5import os
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00006import errno
Fred Drake38c2ef02001-07-17 20:52:51 +00007import unittest
Jeremy Hyltona7fc21b2001-08-20 20:10:01 +00008import warnings
Thomas Wouters477c8d52006-05-27 19:21:47 +00009import sys
Brian Curtineb24d742010-04-12 17:16:38 +000010import signal
11import subprocess
12import time
Martin v. Löwis011e8422009-05-05 04:43:17 +000013import shutil
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Victor Stinnerc2d095f2010-05-17 00:14:53 +000015import contextlib
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000016import mmap
17import uuid
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000018import asyncore
19import asynchat
20import socket
21try:
22 import threading
23except ImportError:
24 threading = None
Fred Drake38c2ef02001-07-17 20:52:51 +000025
Mark Dickinson7cf03892010-04-16 13:45:35 +000026# Detect whether we're on a Linux system that uses the (now outdated
27# and unmaintained) linuxthreads threading library. There's an issue
28# when combining linuxthreads with a failed execv call: see
29# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020030if hasattr(sys, 'thread_info') and sys.thread_info.version:
31 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
32else:
33 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000034
Thomas Wouters0e3f5912006-08-11 14:57:12 +000035# Tests creating TESTFN
36class FileTests(unittest.TestCase):
37 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000038 if os.path.exists(support.TESTFN):
39 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000040 tearDown = setUp
41
42 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000043 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +000044 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000045 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000046
Christian Heimesfdab48e2008-01-20 09:06:41 +000047 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000048 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
49 # We must allocate two consecutive file descriptors, otherwise
50 # it will mess up other file descriptors (perhaps even the three
51 # standard ones).
52 second = os.dup(first)
53 try:
54 retries = 0
55 while second != first + 1:
56 os.close(first)
57 retries += 1
58 if retries > 10:
59 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +000060 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000061 first, second = second, os.dup(second)
62 finally:
63 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +000064 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +000065 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000066 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067
Benjamin Peterson1cc6df92010-06-30 17:39:45 +000068 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +000069 def test_rename(self):
70 path = support.TESTFN
71 old = sys.getrefcount(path)
72 self.assertRaises(TypeError, os.rename, path, 0)
73 new = sys.getrefcount(path)
74 self.assertEqual(old, new)
75
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000076 def test_read(self):
77 with open(support.TESTFN, "w+b") as fobj:
78 fobj.write(b"spam")
79 fobj.flush()
80 fd = fobj.fileno()
81 os.lseek(fd, 0, 0)
82 s = os.read(fd, 4)
83 self.assertEqual(type(s), bytes)
84 self.assertEqual(s, b"spam")
85
86 def test_write(self):
87 # os.write() accepts bytes- and buffer-like objects but not strings
88 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
89 self.assertRaises(TypeError, os.write, fd, "beans")
90 os.write(fd, b"bacon\n")
91 os.write(fd, bytearray(b"eggs\n"))
92 os.write(fd, memoryview(b"spam\n"))
93 os.close(fd)
94 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +000095 self.assertEqual(fobj.read().splitlines(),
96 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +000097
Victor Stinnere0daff12011-03-20 23:36:35 +010098 def write_windows_console(self, *args):
99 retcode = subprocess.call(args,
100 # use a new console to not flood the test output
101 creationflags=subprocess.CREATE_NEW_CONSOLE,
102 # use a shell to hide the console window (SW_HIDE)
103 shell=True)
104 self.assertEqual(retcode, 0)
105
106 @unittest.skipUnless(sys.platform == 'win32',
107 'test specific to the Windows console')
108 def test_write_windows_console(self):
109 # Issue #11395: the Windows console returns an error (12: not enough
110 # space error) on writing into stdout if stdout mode is binary and the
111 # length is greater than 66,000 bytes (or less, depending on heap
112 # usage).
113 code = "print('x' * 100000)"
114 self.write_windows_console(sys.executable, "-c", code)
115 self.write_windows_console(sys.executable, "-u", "-c", code)
116
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000117 def fdopen_helper(self, *args):
118 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200119 f = os.fdopen(fd, *args)
120 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000121
122 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200123 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
124 os.close(fd)
125
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000126 self.fdopen_helper()
127 self.fdopen_helper('r')
128 self.fdopen_helper('r', 100)
129
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200130
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000131# Test attributes on return values from os.*stat* family.
132class StatAttributeTests(unittest.TestCase):
133 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000134 os.mkdir(support.TESTFN)
135 self.fname = os.path.join(support.TESTFN, "f1")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000136 f = open(self.fname, 'wb')
Guido van Rossum26d95c32007-08-27 23:18:54 +0000137 f.write(b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000138 f.close()
Tim Peterse0c446b2001-10-18 21:57:37 +0000139
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000140 def tearDown(self):
141 os.unlink(self.fname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000142 os.rmdir(support.TESTFN)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000143
Antoine Pitrou38425292010-09-21 18:19:07 +0000144 def check_stat_attributes(self, fname):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000145 if not hasattr(os, "stat"):
146 return
147
148 import stat
Antoine Pitrou38425292010-09-21 18:19:07 +0000149 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000150
151 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000152 self.assertEqual(result[stat.ST_SIZE], 3)
153 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000154
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000155 # Make sure all the attributes are there
156 members = dir(result)
157 for name in dir(stat):
158 if name[:3] == 'ST_':
159 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000160 if name.endswith("TIME"):
161 def trunc(x): return int(x)
162 else:
163 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000164 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000165 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000166 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000167
168 try:
169 result[200]
170 self.fail("No exception thrown")
171 except IndexError:
172 pass
173
174 # Make sure that assignment fails
175 try:
176 result.st_mode = 1
177 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000178 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000179 pass
180
181 try:
182 result.st_rdev = 1
183 self.fail("No exception thrown")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000184 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000185 pass
186
187 try:
188 result.parrot = 1
189 self.fail("No exception thrown")
190 except AttributeError:
191 pass
192
193 # Use the stat_result constructor with a too-short tuple.
194 try:
195 result2 = os.stat_result((10,))
196 self.fail("No exception thrown")
197 except TypeError:
198 pass
199
Ezio Melotti42da6632011-03-15 05:18:48 +0200200 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000201 try:
202 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
203 except TypeError:
204 pass
205
Antoine Pitrou38425292010-09-21 18:19:07 +0000206 def test_stat_attributes(self):
207 self.check_stat_attributes(self.fname)
208
209 def test_stat_attributes_bytes(self):
210 try:
211 fname = self.fname.encode(sys.getfilesystemencoding())
212 except UnicodeEncodeError:
213 self.skipTest("cannot encode %a for the filesystem" % self.fname)
214 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000215
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000216 def test_statvfs_attributes(self):
217 if not hasattr(os, "statvfs"):
218 return
219
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000220 try:
221 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000222 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000223 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000224 if e.errno == errno.ENOSYS:
225 return
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000226
227 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000228 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000229
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000230 # Make sure all the attributes are there.
231 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
232 'ffree', 'favail', 'flag', 'namemax')
233 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000234 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000235
236 # Make sure that assignment really fails
237 try:
238 result.f_bfree = 1
239 self.fail("No exception thrown")
Collin Winter42dae6a2007-03-28 21:44:53 +0000240 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000241 pass
242
243 try:
244 result.parrot = 1
245 self.fail("No exception thrown")
246 except AttributeError:
247 pass
248
249 # Use the constructor with a too-short tuple.
250 try:
251 result2 = os.statvfs_result((10,))
252 self.fail("No exception thrown")
253 except TypeError:
254 pass
255
Ezio Melotti42da6632011-03-15 05:18:48 +0200256 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257 try:
258 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
259 except TypeError:
260 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000261
Thomas Wouters89f507f2006-12-13 04:49:30 +0000262 def test_utime_dir(self):
263 delta = 1000000
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 st = os.stat(support.TESTFN)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000265 # round to int, because some systems may support sub-second
266 # time stamps in stat, but not in utime.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 os.utime(support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
268 st2 = os.stat(support.TESTFN)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
Thomas Wouters89f507f2006-12-13 04:49:30 +0000270
271 # Restrict test to Win32, since there is no guarantee other
272 # systems support centiseconds
273 if sys.platform == 'win32':
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000274 def get_file_system(path):
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000275 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000276 import ctypes
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000277 kernel32 = ctypes.windll.kernel32
Hirokazu Yamamoto5ef6d182008-08-20 04:17:24 +0000278 buf = ctypes.create_unicode_buffer("", 100)
Hirokazu Yamamotoca765d52008-08-20 16:18:19 +0000279 if kernel32.GetVolumeInformationW(root, None, 0, None, None, None, buf, len(buf)):
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000280 return buf.value
281
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000282 if get_file_system(support.TESTFN) == "NTFS":
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000283 def test_1565150(self):
284 t1 = 1159195039.25
285 os.utime(self.fname, (t1, t1))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000286 self.assertEqual(os.stat(self.fname).st_mtime, t1)
Thomas Wouters89f507f2006-12-13 04:49:30 +0000287
Amaury Forgeot d'Arca251a852011-01-03 00:19:11 +0000288 def test_large_time(self):
289 t1 = 5000000000 # some day in 2128
290 os.utime(self.fname, (t1, t1))
291 self.assertEqual(os.stat(self.fname).st_mtime, t1)
292
Guido van Rossumd8faa362007-04-27 19:54:29 +0000293 def test_1686475(self):
294 # Verify that an open file can be stat'ed
295 try:
296 os.stat(r"c:\pagefile.sys")
297 except WindowsError as e:
Benjamin Petersonc4fe6f32008-08-19 18:57:56 +0000298 if e.errno == 2: # file does not exist; cannot run test
Guido van Rossumd8faa362007-04-27 19:54:29 +0000299 return
300 self.fail("Could not stat pagefile.sys")
301
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000302from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000303
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000304class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000305 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000306 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000307
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000308 def setUp(self):
309 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000310 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000311 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000312 for key, value in self._reference().items():
313 os.environ[key] = value
314
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000315 def tearDown(self):
316 os.environ.clear()
317 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000318 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000319 os.environb.clear()
320 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000321
Christian Heimes90333392007-11-01 19:08:42 +0000322 def _reference(self):
323 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
324
325 def _empty_mapping(self):
326 os.environ.clear()
327 return os.environ
328
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000329 # Bug 1110478
Martin v. Löwis5510f652005-02-17 21:23:20 +0000330 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000331 os.environ.clear()
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000332 if os.path.exists("/bin/sh"):
333 os.environ.update(HELLO="World")
Brian Curtin810921b2010-10-30 21:24:21 +0000334 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
335 value = popen.read().strip()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000336 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000337
Christian Heimes1a13d592007-11-08 14:16:55 +0000338 def test_os_popen_iter(self):
339 if os.path.exists("/bin/sh"):
Brian Curtin810921b2010-10-30 21:24:21 +0000340 with os.popen(
341 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
342 it = iter(popen)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000343 self.assertEqual(next(it), "line1\n")
344 self.assertEqual(next(it), "line2\n")
345 self.assertEqual(next(it), "line3\n")
Brian Curtin810921b2010-10-30 21:24:21 +0000346 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000347
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000348 # Verify environ keys and values from the OS are of the
349 # correct str type.
350 def test_keyvalue_types(self):
351 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000352 self.assertEqual(type(key), str)
353 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000354
Christian Heimes90333392007-11-01 19:08:42 +0000355 def test_items(self):
356 for key, value in self._reference().items():
357 self.assertEqual(os.environ.get(key), value)
358
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000359 # Issue 7310
360 def test___repr__(self):
361 """Check that the repr() of os.environ looks like environ({...})."""
362 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000363 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
364 '{!r}: {!r}'.format(key, value)
365 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000366
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000367 def test_get_exec_path(self):
368 defpath_list = os.defpath.split(os.pathsep)
369 test_path = ['/monty', '/python', '', '/flying/circus']
370 test_env = {'PATH': os.pathsep.join(test_path)}
371
372 saved_environ = os.environ
373 try:
374 os.environ = dict(test_env)
375 # Test that defaulting to os.environ works.
376 self.assertSequenceEqual(test_path, os.get_exec_path())
377 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
378 finally:
379 os.environ = saved_environ
380
381 # No PATH environment variable
382 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
383 # Empty PATH environment variable
384 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
385 # Supplied PATH environment variable
386 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
387
Victor Stinnerb745a742010-05-18 17:17:23 +0000388 if os.supports_bytes_environ:
389 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000390 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000391 # ignore BytesWarning warning
392 with warnings.catch_warnings(record=True):
393 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000394 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000395 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000396 pass
397 else:
398 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000399
400 # bytes key and/or value
401 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
402 ['abc'])
403 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
404 ['abc'])
405 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
406 ['abc'])
407
408 @unittest.skipUnless(os.supports_bytes_environ,
409 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000410 def test_environb(self):
411 # os.environ -> os.environb
412 value = 'euro\u20ac'
413 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000414 value_bytes = value.encode(sys.getfilesystemencoding(),
415 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000416 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000417 msg = "U+20AC character is not encodable to %s" % (
418 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000419 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000420 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000421 self.assertEqual(os.environ['unicode'], value)
422 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000423
424 # os.environb -> os.environ
425 value = b'\xff'
426 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000427 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000428 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000429 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000430
Tim Petersc4e09402003-04-25 07:11:48 +0000431class WalkTests(unittest.TestCase):
432 """Tests for os.walk()."""
433
434 def test_traversal(self):
435 import os
436 from os.path import join
437
438 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000439 # TESTFN/
440 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000441 # tmp1
442 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000443 # tmp2
444 # SUB11/ no kids
445 # SUB2/ a file kid and a dirsymlink kid
446 # tmp3
447 # link/ a symlink to TESTFN.2
448 # TEST2/
449 # tmp4 a lone file
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000450 walk_path = join(support.TESTFN, "TEST1")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000451 sub1_path = join(walk_path, "SUB1")
Tim Petersc4e09402003-04-25 07:11:48 +0000452 sub11_path = join(sub1_path, "SUB11")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000453 sub2_path = join(walk_path, "SUB2")
454 tmp1_path = join(walk_path, "tmp1")
Tim Petersc4e09402003-04-25 07:11:48 +0000455 tmp2_path = join(sub1_path, "tmp2")
456 tmp3_path = join(sub2_path, "tmp3")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000457 link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000458 t2_path = join(support.TESTFN, "TEST2")
459 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Tim Petersc4e09402003-04-25 07:11:48 +0000460
461 # Create stuff.
462 os.makedirs(sub11_path)
463 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000464 os.makedirs(t2_path)
465 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Alex Martelli01c77c62006-08-24 02:58:11 +0000466 f = open(path, "w")
Tim Petersc4e09402003-04-25 07:11:48 +0000467 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
468 f.close()
Brian Curtin3b4499c2010-12-28 14:31:47 +0000469 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000470 os.symlink(os.path.abspath(t2_path), link_path)
471 sub2_tree = (sub2_path, ["link"], ["tmp3"])
472 else:
473 sub2_tree = (sub2_path, [], ["tmp3"])
Tim Petersc4e09402003-04-25 07:11:48 +0000474
475 # Walk top-down.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000476 all = list(os.walk(walk_path))
Tim Petersc4e09402003-04-25 07:11:48 +0000477 self.assertEqual(len(all), 4)
478 # We can't know which order SUB1 and SUB2 will appear in.
479 # Not flipped: TESTFN, SUB1, SUB11, SUB2
480 # flipped: TESTFN, SUB2, SUB1, SUB11
481 flipped = all[0][1][0] != "SUB1"
482 all[0][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000483 self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000484 self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
485 self.assertEqual(all[2 + flipped], (sub11_path, [], []))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000486 self.assertEqual(all[3 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000487
488 # Prune the search.
489 all = []
Guido van Rossumd8faa362007-04-27 19:54:29 +0000490 for root, dirs, files in os.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000491 all.append((root, dirs, files))
492 # Don't descend into SUB1.
493 if 'SUB1' in dirs:
494 # Note that this also mutates the dirs we appended to all!
495 dirs.remove('SUB1')
496 self.assertEqual(len(all), 2)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000497 self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
498 self.assertEqual(all[1], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000499
500 # Walk bottom-up.
Guido van Rossumd8faa362007-04-27 19:54:29 +0000501 all = list(os.walk(walk_path, topdown=False))
Tim Petersc4e09402003-04-25 07:11:48 +0000502 self.assertEqual(len(all), 4)
503 # We can't know which order SUB1 and SUB2 will appear in.
504 # Not flipped: SUB11, SUB1, SUB2, TESTFN
505 # flipped: SUB2, SUB11, SUB1, TESTFN
506 flipped = all[3][1][0] != "SUB1"
507 all[3][1].sort()
Guido van Rossumd8faa362007-04-27 19:54:29 +0000508 self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
Tim Petersc4e09402003-04-25 07:11:48 +0000509 self.assertEqual(all[flipped], (sub11_path, [], []))
510 self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
Guido van Rossumd8faa362007-04-27 19:54:29 +0000511 self.assertEqual(all[2 - 2 * flipped], sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000512
Brian Curtin3b4499c2010-12-28 14:31:47 +0000513 if support.can_symlink():
Guido van Rossumd8faa362007-04-27 19:54:29 +0000514 # Walk, following symlinks.
515 for root, dirs, files in os.walk(walk_path, followlinks=True):
516 if root == link_path:
517 self.assertEqual(dirs, [])
518 self.assertEqual(files, ["tmp4"])
519 break
520 else:
521 self.fail("Didn't follow symlink with followlinks=True")
522
523 def tearDown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000524 # Tear everything down. This is a decent use for bottom-up on
525 # Windows, which doesn't have a recursive delete command. The
526 # (not so) subtlety is that rmdir will fail unless the dir's
527 # kids are removed first, so bottom up is essential.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000528 for root, dirs, files in os.walk(support.TESTFN, topdown=False):
Tim Petersc4e09402003-04-25 07:11:48 +0000529 for name in files:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000530 os.remove(os.path.join(root, name))
Tim Petersc4e09402003-04-25 07:11:48 +0000531 for name in dirs:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000532 dirname = os.path.join(root, name)
533 if not os.path.islink(dirname):
534 os.rmdir(dirname)
535 else:
536 os.remove(dirname)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000537 os.rmdir(support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000538
Guido van Rossume7ba4952007-06-06 23:52:48 +0000539class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000540 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000541 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000542
543 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000544 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000545 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
546 os.makedirs(path) # Should work
547 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
548 os.makedirs(path)
549
550 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000551 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000552 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
553 os.makedirs(path)
554 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
555 'dir5', 'dir6')
556 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000557
Terry Reedy5a22b652010-12-02 07:05:56 +0000558 def test_exist_ok_existing_directory(self):
559 path = os.path.join(support.TESTFN, 'dir1')
560 mode = 0o777
561 old_mask = os.umask(0o022)
562 os.makedirs(path, mode)
563 self.assertRaises(OSError, os.makedirs, path, mode)
564 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
565 self.assertRaises(OSError, os.makedirs, path, 0o776, exist_ok=True)
566 os.makedirs(path, mode=mode, exist_ok=True)
567 os.umask(old_mask)
568
569 def test_exist_ok_existing_regular_file(self):
570 base = support.TESTFN
571 path = os.path.join(support.TESTFN, 'dir1')
572 f = open(path, 'w')
573 f.write('abc')
574 f.close()
575 self.assertRaises(OSError, os.makedirs, path)
576 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
577 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
578 os.remove(path)
579
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000580 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000581 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000582 'dir4', 'dir5', 'dir6')
583 # If the tests failed, the bottom-most directory ('../dir6')
584 # may not have been created, so we look for the outermost directory
585 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000586 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000587 path = os.path.dirname(path)
588
589 os.removedirs(path)
590
Guido van Rossume7ba4952007-06-06 23:52:48 +0000591class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +0000592 def test_devnull(self):
Victor Stinnera6d2c762011-06-30 18:20:11 +0200593 with open(os.devnull, 'wb') as f:
594 f.write(b'hello')
595 f.close()
596 with open(os.devnull, 'rb') as f:
597 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +0000598
Guido van Rossume7ba4952007-06-06 23:52:48 +0000599class URandomTests(unittest.TestCase):
Martin v. Löwisdc3883f2004-08-29 15:46:35 +0000600 def test_urandom(self):
601 try:
602 self.assertEqual(len(os.urandom(1)), 1)
603 self.assertEqual(len(os.urandom(10)), 10)
604 self.assertEqual(len(os.urandom(100)), 100)
605 self.assertEqual(len(os.urandom(1000)), 1000)
606 except NotImplementedError:
607 pass
608
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000609@contextlib.contextmanager
610def _execvpe_mockup(defpath=None):
611 """
612 Stubs out execv and execve functions when used as context manager.
613 Records exec calls. The mock execv and execve functions always raise an
614 exception as they would normally never return.
615 """
616 # A list of tuples containing (function name, first arg, args)
617 # of calls to execv or execve that have been made.
618 calls = []
619
620 def mock_execv(name, *args):
621 calls.append(('execv', name, args))
622 raise RuntimeError("execv called")
623
624 def mock_execve(name, *args):
625 calls.append(('execve', name, args))
626 raise OSError(errno.ENOTDIR, "execve called")
627
628 try:
629 orig_execv = os.execv
630 orig_execve = os.execve
631 orig_defpath = os.defpath
632 os.execv = mock_execv
633 os.execve = mock_execve
634 if defpath is not None:
635 os.defpath = defpath
636 yield calls
637 finally:
638 os.execv = orig_execv
639 os.execve = orig_execve
640 os.defpath = orig_defpath
641
Guido van Rossume7ba4952007-06-06 23:52:48 +0000642class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000643 @unittest.skipIf(USING_LINUXTHREADS,
644 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +0000645 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +0000646 self.assertRaises(OSError, os.execvpe, 'no such app-',
647 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +0000648
Thomas Heller6790d602007-08-30 17:15:14 +0000649 def test_execvpe_with_bad_arglist(self):
650 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
651
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000652 @unittest.skipUnless(hasattr(os, '_execvpe'),
653 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +0000654 def _test_internal_execvpe(self, test_type):
655 program_path = os.sep + 'absolutepath'
656 if test_type is bytes:
657 program = b'executable'
658 fullpath = os.path.join(os.fsencode(program_path), program)
659 native_fullpath = fullpath
660 arguments = [b'progname', 'arg1', 'arg2']
661 else:
662 program = 'executable'
663 arguments = ['progname', 'arg1', 'arg2']
664 fullpath = os.path.join(program_path, program)
665 if os.name != "nt":
666 native_fullpath = os.fsencode(fullpath)
667 else:
668 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000669 env = {'spam': 'beans'}
670
Victor Stinnerb745a742010-05-18 17:17:23 +0000671 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000672 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000673 self.assertRaises(RuntimeError,
674 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000675 self.assertEqual(len(calls), 1)
676 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
677
Victor Stinnerb745a742010-05-18 17:17:23 +0000678 # test os._execvpe() with a relative path:
679 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000680 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +0000681 self.assertRaises(OSError,
682 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000683 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +0000684 self.assertSequenceEqual(calls[0],
685 ('execve', native_fullpath, (arguments, env)))
686
687 # test os._execvpe() with a relative path:
688 # os.get_exec_path() reads the 'PATH' variable
689 with _execvpe_mockup() as calls:
690 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +0000691 if test_type is bytes:
692 env_path[b'PATH'] = program_path
693 else:
694 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +0000695 self.assertRaises(OSError,
696 os._execvpe, program, arguments, env=env_path)
697 self.assertEqual(len(calls), 1)
698 self.assertSequenceEqual(calls[0],
699 ('execve', native_fullpath, (arguments, env_path)))
700
701 def test_internal_execvpe_str(self):
702 self._test_internal_execvpe(str)
703 if os.name != "nt":
704 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +0000705
Gregory P. Smith4ae37772010-05-08 18:05:46 +0000706
Thomas Wouters477c8d52006-05-27 19:21:47 +0000707class Win32ErrorTests(unittest.TestCase):
708 def test_rename(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000709 self.assertRaises(WindowsError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +0000710
711 def test_remove(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000712 self.assertRaises(WindowsError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000713
714 def test_chdir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000715 self.assertRaises(WindowsError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000716
717 def test_mkdir(self):
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000718 f = open(support.TESTFN, "w")
Benjamin Petersonf91df042009-02-13 02:50:59 +0000719 try:
720 self.assertRaises(WindowsError, os.mkdir, support.TESTFN)
721 finally:
722 f.close()
Amaury Forgeot d'Arc2fc224f2009-02-19 23:23:47 +0000723 os.unlink(support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000724
725 def test_utime(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000726 self.assertRaises(WindowsError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000727
Thomas Wouters477c8d52006-05-27 19:21:47 +0000728 def test_chmod(self):
Benjamin Petersonf91df042009-02-13 02:50:59 +0000729 self.assertRaises(WindowsError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +0000730
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000731class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +0000732 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000733 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
734 #singles.append("close")
735 #We omit close because it doesn'r raise an exception on some platforms
736 def get_single(f):
737 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000738 if hasattr(os, f):
739 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000740 return helper
741 for f in singles:
742 locals()["test_"+f] = get_single(f)
743
Benjamin Peterson7522c742009-01-19 21:00:09 +0000744 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +0000745 try:
746 f(support.make_bad_fd(), *args)
747 except OSError as e:
748 self.assertEqual(e.errno, errno.EBADF)
749 else:
750 self.fail("%r didn't raise a OSError with a bad file descriptor"
751 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +0000752
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000753 def test_isatty(self):
754 if hasattr(os, "isatty"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000755 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000756
757 def test_closerange(self):
758 if hasattr(os, "closerange"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000759 fd = support.make_bad_fd()
R. David Murray630cc482009-07-22 15:20:27 +0000760 # Make sure none of the descriptors we are about to close are
761 # currently valid (issue 6542).
762 for i in range(10):
763 try: os.fstat(fd+i)
764 except OSError:
765 pass
766 else:
767 break
768 if i < 2:
769 raise unittest.SkipTest(
770 "Unable to acquire a range of invalid file descriptors")
771 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000772
773 def test_dup2(self):
774 if hasattr(os, "dup2"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000775 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000776
777 def test_fchmod(self):
778 if hasattr(os, "fchmod"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000779 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000780
781 def test_fchown(self):
782 if hasattr(os, "fchown"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000783 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000784
785 def test_fpathconf(self):
786 if hasattr(os, "fpathconf"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000787 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000788
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000789 def test_ftruncate(self):
790 if hasattr(os, "ftruncate"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000791 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000792
793 def test_lseek(self):
794 if hasattr(os, "lseek"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000795 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000796
797 def test_read(self):
798 if hasattr(os, "read"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000799 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000800
801 def test_tcsetpgrpt(self):
802 if hasattr(os, "tcsetpgrp"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000803 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000804
805 def test_write(self):
806 if hasattr(os, "write"):
Benjamin Peterson7522c742009-01-19 21:00:09 +0000807 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +0000808
Brian Curtin1b9df392010-11-24 20:24:31 +0000809
810class LinkTests(unittest.TestCase):
811 def setUp(self):
812 self.file1 = support.TESTFN
813 self.file2 = os.path.join(support.TESTFN + "2")
814
Brian Curtinc0abc4e2010-11-30 23:46:54 +0000815 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +0000816 for file in (self.file1, self.file2):
817 if os.path.exists(file):
818 os.unlink(file)
819
Brian Curtin1b9df392010-11-24 20:24:31 +0000820 def _test_link(self, file1, file2):
821 with open(file1, "w") as f1:
822 f1.write("test")
823
824 os.link(file1, file2)
825 with open(file1, "r") as f1, open(file2, "r") as f2:
826 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
827
828 def test_link(self):
829 self._test_link(self.file1, self.file2)
830
831 def test_link_bytes(self):
832 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
833 bytes(self.file2, sys.getfilesystemencoding()))
834
Brian Curtinf498b752010-11-30 15:54:04 +0000835 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +0000836 try:
Brian Curtinf498b752010-11-30 15:54:04 +0000837 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +0000838 except UnicodeError:
839 raise unittest.SkipTest("Unable to encode for this platform.")
840
Brian Curtinf498b752010-11-30 15:54:04 +0000841 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +0000842 self.file2 = self.file1 + "2"
843 self._test_link(self.file1, self.file2)
844
Thomas Wouters477c8d52006-05-27 19:21:47 +0000845if sys.platform != 'win32':
846 class Win32ErrorTests(unittest.TestCase):
847 pass
848
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000849 class PosixUidGidTests(unittest.TestCase):
850 if hasattr(os, 'setuid'):
851 def test_setuid(self):
852 if os.getuid() != 0:
853 self.assertRaises(os.error, os.setuid, 0)
854 self.assertRaises(OverflowError, os.setuid, 1<<32)
855
856 if hasattr(os, 'setgid'):
857 def test_setgid(self):
858 if os.getuid() != 0:
859 self.assertRaises(os.error, os.setgid, 0)
860 self.assertRaises(OverflowError, os.setgid, 1<<32)
861
862 if hasattr(os, 'seteuid'):
863 def test_seteuid(self):
864 if os.getuid() != 0:
865 self.assertRaises(os.error, os.seteuid, 0)
866 self.assertRaises(OverflowError, os.seteuid, 1<<32)
867
868 if hasattr(os, 'setegid'):
869 def test_setegid(self):
870 if os.getuid() != 0:
871 self.assertRaises(os.error, os.setegid, 0)
872 self.assertRaises(OverflowError, os.setegid, 1<<32)
873
874 if hasattr(os, 'setreuid'):
875 def test_setreuid(self):
876 if os.getuid() != 0:
877 self.assertRaises(os.error, os.setreuid, 0, 0)
878 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
879 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000880
881 def test_setreuid_neg1(self):
882 # Needs to accept -1. We run this in a subprocess to avoid
883 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000884 subprocess.check_call([
885 sys.executable, '-c',
886 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000887
888 if hasattr(os, 'setregid'):
889 def test_setregid(self):
890 if os.getuid() != 0:
891 self.assertRaises(os.error, os.setregid, 0, 0)
892 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
893 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000894
895 def test_setregid_neg1(self):
896 # Needs to accept -1. We run this in a subprocess to avoid
897 # altering the test runner's process state (issue8045).
Benjamin Petersonebe87ba2010-03-06 20:34:24 +0000898 subprocess.check_call([
899 sys.executable, '-c',
900 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Martin v. Löwis011e8422009-05-05 04:43:17 +0000901
902 class Pep383Tests(unittest.TestCase):
Martin v. Löwis011e8422009-05-05 04:43:17 +0000903 def setUp(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000904 if support.TESTFN_UNENCODABLE:
905 self.dir = support.TESTFN_UNENCODABLE
906 else:
907 self.dir = support.TESTFN
908 self.bdir = os.fsencode(self.dir)
909
910 bytesfn = []
911 def add_filename(fn):
912 try:
913 fn = os.fsencode(fn)
914 except UnicodeEncodeError:
915 return
916 bytesfn.append(fn)
917 add_filename(support.TESTFN_UNICODE)
918 if support.TESTFN_UNENCODABLE:
919 add_filename(support.TESTFN_UNENCODABLE)
920 if not bytesfn:
921 self.skipTest("couldn't create any non-ascii filename")
922
923 self.unicodefn = set()
Martin v. Löwis011e8422009-05-05 04:43:17 +0000924 os.mkdir(self.dir)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000925 try:
926 for fn in bytesfn:
Victor Stinnerbf816222011-06-30 23:25:47 +0200927 support.create_empty_file(os.path.join(self.bdir, fn))
Victor Stinnere8d51452010-08-19 01:05:19 +0000928 fn = os.fsdecode(fn)
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000929 if fn in self.unicodefn:
930 raise ValueError("duplicate filename")
931 self.unicodefn.add(fn)
932 except:
933 shutil.rmtree(self.dir)
934 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +0000935
936 def tearDown(self):
937 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000938
939 def test_listdir(self):
Victor Stinnerd91df1a2010-08-18 10:56:19 +0000940 expected = self.unicodefn
941 found = set(os.listdir(self.dir))
Ezio Melottib3aedd42010-11-20 19:04:17 +0000942 self.assertEqual(found, expected)
Martin v. Löwis011e8422009-05-05 04:43:17 +0000943
944 def test_open(self):
945 for fn in self.unicodefn:
Victor Stinnera6d2c762011-06-30 18:20:11 +0200946 f = open(os.path.join(self.dir, fn), 'rb')
Martin v. Löwis011e8422009-05-05 04:43:17 +0000947 f.close()
948
949 def test_stat(self):
950 for fn in self.unicodefn:
951 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000952else:
953 class PosixUidGidTests(unittest.TestCase):
954 pass
Martin v. Löwis011e8422009-05-05 04:43:17 +0000955 class Pep383Tests(unittest.TestCase):
956 pass
Benjamin Petersonef3e4c22009-04-11 19:48:14 +0000957
Brian Curtineb24d742010-04-12 17:16:38 +0000958@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
959class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +0000960 def _kill(self, sig):
961 # Start sys.executable as a subprocess and communicate from the
962 # subprocess to the parent that the interpreter is ready. When it
963 # becomes ready, send *sig* via os.kill to the subprocess and check
964 # that the return code is equal to *sig*.
965 import ctypes
966 from ctypes import wintypes
967 import msvcrt
968
969 # Since we can't access the contents of the process' stdout until the
970 # process has exited, use PeekNamedPipe to see what's inside stdout
971 # without waiting. This is done so we can tell that the interpreter
972 # is started and running at a point where it could handle a signal.
973 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
974 PeekNamedPipe.restype = wintypes.BOOL
975 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
976 ctypes.POINTER(ctypes.c_char), # stdout buf
977 wintypes.DWORD, # Buffer size
978 ctypes.POINTER(wintypes.DWORD), # bytes read
979 ctypes.POINTER(wintypes.DWORD), # bytes avail
980 ctypes.POINTER(wintypes.DWORD)) # bytes left
981 msg = "running"
982 proc = subprocess.Popen([sys.executable, "-c",
983 "import sys;"
984 "sys.stdout.write('{}');"
985 "sys.stdout.flush();"
986 "input()".format(msg)],
987 stdout=subprocess.PIPE,
988 stderr=subprocess.PIPE,
989 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +0000990 self.addCleanup(proc.stdout.close)
991 self.addCleanup(proc.stderr.close)
992 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +0000993
994 count, max = 0, 100
995 while count < max and proc.poll() is None:
996 # Create a string buffer to store the result of stdout from the pipe
997 buf = ctypes.create_string_buffer(len(msg))
998 # Obtain the text currently in proc.stdout
999 # Bytes read/avail/left are left as NULL and unused
1000 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1001 buf, ctypes.sizeof(buf), None, None, None)
1002 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1003 if buf.value:
1004 self.assertEqual(msg, buf.value.decode())
1005 break
1006 time.sleep(0.1)
1007 count += 1
1008 else:
1009 self.fail("Did not receive communication from the subprocess")
1010
Brian Curtineb24d742010-04-12 17:16:38 +00001011 os.kill(proc.pid, sig)
1012 self.assertEqual(proc.wait(), sig)
1013
1014 def test_kill_sigterm(self):
1015 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001016 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001017
1018 def test_kill_int(self):
1019 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001020 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001021
1022 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001023 tagname = "test_os_%s" % uuid.uuid1()
1024 m = mmap.mmap(-1, 1, tagname)
1025 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001026 # Run a script which has console control handling enabled.
1027 proc = subprocess.Popen([sys.executable,
1028 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001029 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001030 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1031 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001032 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001033 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001034 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001035 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001036 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001037 count += 1
1038 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001039 # Forcefully kill the process if we weren't able to signal it.
1040 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001041 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001042 os.kill(proc.pid, event)
1043 # proc.send_signal(event) could also be done here.
1044 # Allow time for the signal to be passed and the process to exit.
1045 time.sleep(0.5)
1046 if not proc.poll():
1047 # Forcefully kill the process if we weren't able to signal it.
1048 os.kill(proc.pid, signal.SIGINT)
1049 self.fail("subprocess did not stop on {}".format(name))
1050
1051 @unittest.skip("subprocesses aren't inheriting CTRL+C property")
1052 def test_CTRL_C_EVENT(self):
1053 from ctypes import wintypes
1054 import ctypes
1055
1056 # Make a NULL value by creating a pointer with no argument.
1057 NULL = ctypes.POINTER(ctypes.c_int)()
1058 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1059 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1060 wintypes.BOOL)
1061 SetConsoleCtrlHandler.restype = wintypes.BOOL
1062
1063 # Calling this with NULL and FALSE causes the calling process to
1064 # handle CTRL+C, rather than ignore it. This property is inherited
1065 # by subprocesses.
1066 SetConsoleCtrlHandler(NULL, 0)
1067
1068 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1069
1070 def test_CTRL_BREAK_EVENT(self):
1071 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1072
1073
Brian Curtind40e6f72010-07-08 21:39:08 +00001074@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001075@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001076class Win32SymlinkTests(unittest.TestCase):
1077 filelink = 'filelinktest'
1078 filelink_target = os.path.abspath(__file__)
1079 dirlink = 'dirlinktest'
1080 dirlink_target = os.path.dirname(filelink_target)
1081 missing_link = 'missing link'
1082
1083 def setUp(self):
1084 assert os.path.exists(self.dirlink_target)
1085 assert os.path.exists(self.filelink_target)
1086 assert not os.path.exists(self.dirlink)
1087 assert not os.path.exists(self.filelink)
1088 assert not os.path.exists(self.missing_link)
1089
1090 def tearDown(self):
1091 if os.path.exists(self.filelink):
1092 os.remove(self.filelink)
1093 if os.path.exists(self.dirlink):
1094 os.rmdir(self.dirlink)
1095 if os.path.lexists(self.missing_link):
1096 os.remove(self.missing_link)
1097
1098 def test_directory_link(self):
1099 os.symlink(self.dirlink_target, self.dirlink)
1100 self.assertTrue(os.path.exists(self.dirlink))
1101 self.assertTrue(os.path.isdir(self.dirlink))
1102 self.assertTrue(os.path.islink(self.dirlink))
1103 self.check_stat(self.dirlink, self.dirlink_target)
1104
1105 def test_file_link(self):
1106 os.symlink(self.filelink_target, self.filelink)
1107 self.assertTrue(os.path.exists(self.filelink))
1108 self.assertTrue(os.path.isfile(self.filelink))
1109 self.assertTrue(os.path.islink(self.filelink))
1110 self.check_stat(self.filelink, self.filelink_target)
1111
1112 def _create_missing_dir_link(self):
1113 'Create a "directory" link to a non-existent target'
1114 linkname = self.missing_link
1115 if os.path.lexists(linkname):
1116 os.remove(linkname)
1117 target = r'c:\\target does not exist.29r3c740'
1118 assert not os.path.exists(target)
1119 target_is_dir = True
1120 os.symlink(target, linkname, target_is_dir)
1121
1122 def test_remove_directory_link_to_missing_target(self):
1123 self._create_missing_dir_link()
1124 # For compatibility with Unix, os.remove will check the
1125 # directory status and call RemoveDirectory if the symlink
1126 # was created with target_is_dir==True.
1127 os.remove(self.missing_link)
1128
1129 @unittest.skip("currently fails; consider for improvement")
1130 def test_isdir_on_directory_link_to_missing_target(self):
1131 self._create_missing_dir_link()
1132 # consider having isdir return true for directory links
1133 self.assertTrue(os.path.isdir(self.missing_link))
1134
1135 @unittest.skip("currently fails; consider for improvement")
1136 def test_rmdir_on_directory_link_to_missing_target(self):
1137 self._create_missing_dir_link()
1138 # consider allowing rmdir to remove directory links
1139 os.rmdir(self.missing_link)
1140
1141 def check_stat(self, link, target):
1142 self.assertEqual(os.stat(link), os.stat(target))
1143 self.assertNotEqual(os.lstat(link), os.stat(link))
1144
Brian Curtind25aef52011-06-13 15:16:04 -05001145 bytes_link = os.fsencode(link)
1146 self.assertEqual(os.stat(bytes_link), os.stat(target))
1147 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
1148
1149 def test_12084(self):
1150 level1 = os.path.abspath(support.TESTFN)
1151 level2 = os.path.join(level1, "level2")
1152 level3 = os.path.join(level2, "level3")
1153 try:
1154 os.mkdir(level1)
1155 os.mkdir(level2)
1156 os.mkdir(level3)
1157
1158 file1 = os.path.abspath(os.path.join(level1, "file1"))
1159
1160 with open(file1, "w") as f:
1161 f.write("file1")
1162
1163 orig_dir = os.getcwd()
1164 try:
1165 os.chdir(level2)
1166 link = os.path.join(level2, "link")
1167 os.symlink(os.path.relpath(file1), "link")
1168 self.assertIn("link", os.listdir(os.getcwd()))
1169
1170 # Check os.stat calls from the same dir as the link
1171 self.assertEqual(os.stat(file1), os.stat("link"))
1172
1173 # Check os.stat calls from a dir below the link
1174 os.chdir(level1)
1175 self.assertEqual(os.stat(file1),
1176 os.stat(os.path.relpath(link)))
1177
1178 # Check os.stat calls from a dir above the link
1179 os.chdir(level3)
1180 self.assertEqual(os.stat(file1),
1181 os.stat(os.path.relpath(link)))
1182 finally:
1183 os.chdir(orig_dir)
1184 except OSError as err:
1185 self.fail(err)
1186 finally:
1187 os.remove(file1)
1188 shutil.rmtree(level1)
1189
Brian Curtind40e6f72010-07-08 21:39:08 +00001190
Victor Stinnere8d51452010-08-19 01:05:19 +00001191class FSEncodingTests(unittest.TestCase):
1192 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001193 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
1194 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00001195
Victor Stinnere8d51452010-08-19 01:05:19 +00001196 def test_identity(self):
1197 # assert fsdecode(fsencode(x)) == x
1198 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
1199 try:
1200 bytesfn = os.fsencode(fn)
1201 except UnicodeEncodeError:
1202 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00001203 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00001204
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001205
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001206class PidTests(unittest.TestCase):
1207 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
1208 def test_getppid(self):
1209 p = subprocess.Popen([sys.executable, '-c',
1210 'import os; print(os.getppid())'],
1211 stdout=subprocess.PIPE)
1212 stdout, _ = p.communicate()
1213 # We are the parent of our subprocess
1214 self.assertEqual(int(stdout), os.getpid())
1215
1216
Brian Curtin0151b8e2010-09-24 13:43:43 +00001217# The introduction of this TestCase caused at least two different errors on
1218# *nix buildbots. Temporarily skip this to let the buildbots move along.
1219@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001220@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
1221class LoginTests(unittest.TestCase):
1222 def test_getlogin(self):
1223 user_name = os.getlogin()
1224 self.assertNotEqual(len(user_name), 0)
1225
1226
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001227@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
1228 "needs os.getpriority and os.setpriority")
1229class ProgramPriorityTests(unittest.TestCase):
1230 """Tests for os.getpriority() and os.setpriority()."""
1231
1232 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001233
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001234 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
1235 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
1236 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00001237 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
1238 if base >= 19 and new_prio <= 19:
1239 raise unittest.SkipTest(
1240 "unable to reliably test setpriority at current nice level of %s" % base)
1241 else:
1242 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001243 finally:
1244 try:
1245 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
1246 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00001247 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001248 raise
1249
1250
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001251if threading is not None:
1252 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001253
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001254 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001255
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001256 def __init__(self, conn):
1257 asynchat.async_chat.__init__(self, conn)
1258 self.in_buffer = []
1259 self.closed = False
1260 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001261
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001262 def handle_read(self):
1263 data = self.recv(4096)
1264 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001265
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001266 def get_data(self):
1267 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001268
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001269 def handle_close(self):
1270 self.close()
1271 self.closed = True
1272
1273 def handle_error(self):
1274 raise
1275
1276 def __init__(self, address):
1277 threading.Thread.__init__(self)
1278 asyncore.dispatcher.__init__(self)
1279 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1280 self.bind(address)
1281 self.listen(5)
1282 self.host, self.port = self.socket.getsockname()[:2]
1283 self.handler_instance = None
1284 self._active = False
1285 self._active_lock = threading.Lock()
1286
1287 # --- public API
1288
1289 @property
1290 def running(self):
1291 return self._active
1292
1293 def start(self):
1294 assert not self.running
1295 self.__flag = threading.Event()
1296 threading.Thread.start(self)
1297 self.__flag.wait()
1298
1299 def stop(self):
1300 assert self.running
1301 self._active = False
1302 self.join()
1303
1304 def wait(self):
1305 # wait for handler connection to be closed, then stop the server
1306 while not getattr(self.handler_instance, "closed", False):
1307 time.sleep(0.001)
1308 self.stop()
1309
1310 # --- internals
1311
1312 def run(self):
1313 self._active = True
1314 self.__flag.set()
1315 while self._active and asyncore.socket_map:
1316 self._active_lock.acquire()
1317 asyncore.loop(timeout=0.001, count=1)
1318 self._active_lock.release()
1319 asyncore.close_all()
1320
1321 def handle_accept(self):
1322 conn, addr = self.accept()
1323 self.handler_instance = self.Handler(conn)
1324
1325 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001326 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02001327 handle_read = handle_connect
1328
1329 def writable(self):
1330 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001331
1332 def handle_error(self):
1333 raise
1334
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001335
Giampaolo Rodolà46134642011-02-25 20:01:05 +00001336@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001337@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
1338class TestSendfile(unittest.TestCase):
1339
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001340 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001341 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00001342 not sys.platform.startswith("solaris") and \
1343 not sys.platform.startswith("sunos")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001344
1345 @classmethod
1346 def setUpClass(cls):
1347 with open(support.TESTFN, "wb") as f:
1348 f.write(cls.DATA)
1349
1350 @classmethod
1351 def tearDownClass(cls):
1352 support.unlink(support.TESTFN)
1353
1354 def setUp(self):
1355 self.server = SendfileTestServer((support.HOST, 0))
1356 self.server.start()
1357 self.client = socket.socket()
1358 self.client.connect((self.server.host, self.server.port))
1359 self.client.settimeout(1)
1360 # synchronize by waiting for "220 ready" response
1361 self.client.recv(1024)
1362 self.sockno = self.client.fileno()
1363 self.file = open(support.TESTFN, 'rb')
1364 self.fileno = self.file.fileno()
1365
1366 def tearDown(self):
1367 self.file.close()
1368 self.client.close()
1369 if self.server.running:
1370 self.server.stop()
1371
1372 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
1373 """A higher level wrapper representing how an application is
1374 supposed to use sendfile().
1375 """
1376 while 1:
1377 try:
1378 if self.SUPPORT_HEADERS_TRAILERS:
1379 return os.sendfile(sock, file, offset, nbytes, headers,
1380 trailers)
1381 else:
1382 return os.sendfile(sock, file, offset, nbytes)
1383 except OSError as err:
1384 if err.errno == errno.ECONNRESET:
1385 # disconnected
1386 raise
1387 elif err.errno in (errno.EAGAIN, errno.EBUSY):
1388 # we have to retry send data
1389 continue
1390 else:
1391 raise
1392
1393 def test_send_whole_file(self):
1394 # normal send
1395 total_sent = 0
1396 offset = 0
1397 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001398 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001399 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1400 if sent == 0:
1401 break
1402 offset += sent
1403 total_sent += sent
1404 self.assertTrue(sent <= nbytes)
1405 self.assertEqual(offset, total_sent)
1406
1407 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001408 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001409 self.client.close()
1410 self.server.wait()
1411 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001412 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001413 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001414
1415 def test_send_at_certain_offset(self):
1416 # start sending a file at a certain offset
1417 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001418 offset = len(self.DATA) // 2
1419 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001420 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001421 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001422 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
1423 if sent == 0:
1424 break
1425 offset += sent
1426 total_sent += sent
1427 self.assertTrue(sent <= nbytes)
1428
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001429 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001430 self.client.close()
1431 self.server.wait()
1432 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001433 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001434 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001435 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001436 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001437
1438 def test_offset_overflow(self):
1439 # specify an offset > file size
1440 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00001441 try:
1442 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
1443 except OSError as e:
1444 # Solaris can raise EINVAL if offset >= file length, ignore.
1445 if e.errno != errno.EINVAL:
1446 raise
1447 else:
1448 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00001449 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001450 self.client.close()
1451 self.server.wait()
1452 data = self.server.handler_instance.get_data()
1453 self.assertEqual(data, b'')
1454
1455 def test_invalid_offset(self):
1456 with self.assertRaises(OSError) as cm:
1457 os.sendfile(self.sockno, self.fileno, -1, 4096)
1458 self.assertEqual(cm.exception.errno, errno.EINVAL)
1459
1460 # --- headers / trailers tests
1461
1462 if SUPPORT_HEADERS_TRAILERS:
1463
1464 def test_headers(self):
1465 total_sent = 0
1466 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
1467 headers=[b"x" * 512])
1468 total_sent += sent
1469 offset = 4096
1470 nbytes = 4096
1471 while 1:
1472 sent = self.sendfile_wrapper(self.sockno, self.fileno,
1473 offset, nbytes)
1474 if sent == 0:
1475 break
1476 total_sent += sent
1477 offset += sent
1478
1479 expected_data = b"x" * 512 + self.DATA
1480 self.assertEqual(total_sent, len(expected_data))
1481 self.client.close()
1482 self.server.wait()
1483 data = self.server.handler_instance.get_data()
1484 self.assertEqual(hash(data), hash(expected_data))
1485
1486 def test_trailers(self):
1487 TESTFN2 = support.TESTFN + "2"
Brett Cannonb6376802011-03-15 17:38:22 -04001488 with open(TESTFN2, 'wb') as f:
1489 f.write(b"abcde")
1490 with open(TESTFN2, 'rb')as f:
1491 self.addCleanup(os.remove, TESTFN2)
1492 os.sendfile(self.sockno, f.fileno(), 0, 4096,
1493 trailers=[b"12345"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001494 self.client.close()
1495 self.server.wait()
1496 data = self.server.handler_instance.get_data()
1497 self.assertEqual(data, b"abcde12345")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001498
1499 if hasattr(os, "SF_NODISKIO"):
1500 def test_flags(self):
1501 try:
1502 os.sendfile(self.sockno, self.fileno, 0, 4096,
1503 flags=os.SF_NODISKIO)
1504 except OSError as err:
1505 if err.errno not in (errno.EBUSY, errno.EAGAIN):
1506 raise
1507
1508
Antoine Pitrouf26ad712011-07-15 23:00:56 +02001509@support.reap_threads
Fred Drake2e2be372001-09-20 21:33:42 +00001510def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001511 support.run_unittest(
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001512 FileTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001513 StatAttributeTests,
1514 EnvironTests,
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001515 WalkTests,
1516 MakedirTests,
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001517 DevNullTests,
Thomas Wouters477c8d52006-05-27 19:21:47 +00001518 URandomTests,
Guido van Rossume7ba4952007-06-06 23:52:48 +00001519 ExecTests,
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001520 Win32ErrorTests,
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001521 TestInvalidFD,
Martin v. Löwis011e8422009-05-05 04:43:17 +00001522 PosixUidGidTests,
Brian Curtineb24d742010-04-12 17:16:38 +00001523 Pep383Tests,
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00001524 Win32KillTests,
Brian Curtind40e6f72010-07-08 21:39:08 +00001525 Win32SymlinkTests,
Victor Stinnere8d51452010-08-19 01:05:19 +00001526 FSEncodingTests,
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00001527 PidTests,
Brian Curtine8e4b3b2010-09-23 20:04:14 +00001528 LoginTests,
Brian Curtin1b9df392010-11-24 20:24:31 +00001529 LinkTests,
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00001530 TestSendfile,
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00001531 ProgramPriorityTests,
Walter Dörwald21d3a322003-05-01 17:45:56 +00001532 )
Fred Drake2e2be372001-09-20 21:33:42 +00001533
1534if __name__ == "__main__":
1535 test_main()