blob: 7e74e64df644b5ce676e0f1de2347b01ca16f3c3 [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -07005import select
6import signal
7import subprocess
Antoine Pitrou47a5f482009-06-12 20:41:52 +00008import time
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00009from array import array
10from weakref import proxy
Victor Stinner6a102812010-04-27 23:55:59 +000011try:
12 import threading
13except ImportError:
14 threading = None
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000015
Antoine Pitrou47a5f482009-06-12 20:41:52 +000016from test import test_support
Georg Brandla4f46e12010-02-07 17:03:15 +000017from test.test_support import TESTFN, run_unittest
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000018from UserList import UserList
19
20class AutoFileTests(unittest.TestCase):
21 # file tests for which a test file is automatically set up
22
23 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000024 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000025
26 def tearDown(self):
27 if self.f:
28 self.f.close()
29 os.remove(TESTFN)
30
31 def testWeakRefs(self):
32 # verify weak references
33 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000034 p.write('teststring')
Ezio Melotti2623a372010-11-21 13:34:58 +000035 self.assertEqual(self.f.tell(), p.tell())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000036 self.f.close()
37 self.f = None
38 self.assertRaises(ReferenceError, getattr, p, 'tell')
39
40 def testAttributes(self):
41 # verify expected attributes exist
42 f = self.f
Ezio Melottid80b4bf2010-03-17 13:52:48 +000043 with test_support.check_py3k_warnings():
44 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000045 f.name # merely shouldn't blow up
46 f.mode # ditto
47 f.closed # ditto
48
Ezio Melottid80b4bf2010-03-17 13:52:48 +000049 with test_support.check_py3k_warnings():
50 # verify softspace is writable
51 f.softspace = softspace # merely shouldn't blow up
Antoine Pitrou47a5f482009-06-12 20:41:52 +000052
53 # verify the others aren't
54 for attr in 'name', 'mode', 'closed':
55 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
56
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000057 def testReadinto(self):
58 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000059 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000060 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000061 a = array('c', 'x'*10)
62 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000063 n = self.f.readinto(a)
Ezio Melotti2623a372010-11-21 13:34:58 +000064 self.assertEqual('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000065
66 def testWritelinesUserList(self):
67 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000068 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000069 self.f.writelines(l)
70 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000071 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000072 buf = self.f.read()
Ezio Melotti2623a372010-11-21 13:34:58 +000073 self.assertEqual(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000074
75 def testWritelinesIntegers(self):
76 # verify writelines with integers
77 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
78
79 def testWritelinesIntegersUserList(self):
80 # verify writelines with integers in UserList
81 l = UserList([1,2,3])
82 self.assertRaises(TypeError, self.f.writelines, l)
83
84 def testWritelinesNonString(self):
85 # verify writelines with non-string object
86 class NonString:
87 pass
88
89 self.assertRaises(TypeError, self.f.writelines,
90 [NonString(), NonString()])
91
Antoine Pitrou47a5f482009-06-12 20:41:52 +000092 def testRepr(self):
93 # verify repr works
Benjamin Peterson5c8da862009-06-30 22:57:08 +000094 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
Ezio Melotti11f8b682012-03-12 01:17:02 +020095 # see issue #14161
96 # Windows doesn't like \r\n\t" in the file name, but ' is ok
97 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
98 with open(fname, 'w') as f:
99 self.addCleanup(os.remove, fname)
100 self.assertTrue(repr(f).startswith(
101 "<open file %r, mode 'w' at" % fname))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000102
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000103 def testErrors(self):
Antoine Pitroubb445a12010-02-05 17:05:54 +0000104 self.f.close()
105 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000106 f = self.f
Ezio Melotti2623a372010-11-21 13:34:58 +0000107 self.assertEqual(f.name, TESTFN)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000108 self.assertTrue(not f.isatty())
109 self.assertTrue(not f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000110
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000111 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000112 f.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000113 self.assertTrue(f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000114
115 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000116 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
117 'readline', 'readlines', 'seek', 'tell', 'truncate',
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000118 'write', '__iter__']
119 deprecated_methods = ['xreadlines']
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000120 if sys.platform.startswith('atheos'):
121 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000122
123 # __exit__ should close the file
124 self.f.__exit__(None, None, None)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000125 self.assertTrue(self.f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000126
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000127 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000128 method = getattr(self.f, methodname)
129 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000130 self.assertRaises(ValueError, method)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000131 with test_support.check_py3k_warnings():
132 for methodname in deprecated_methods:
133 method = getattr(self.f, methodname)
134 self.assertRaises(ValueError, method)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000135 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000136
137 # file is closed, __exit__ shouldn't do anything
Ezio Melotti2623a372010-11-21 13:34:58 +0000138 self.assertEqual(self.f.__exit__(None, None, None), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000139 # it must also return None if an exception was given
140 try:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000141 1 // 0
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000142 except:
Ezio Melotti2623a372010-11-21 13:34:58 +0000143 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000144
145 def testReadWhenWriting(self):
146 self.assertRaises(IOError, self.f.read)
147
Benjamin Petersonbf775542010-10-16 19:20:12 +0000148 def testNastyWritelinesGenerator(self):
149 def nasty():
150 for i in range(5):
151 if i == 3:
152 self.f.close()
153 yield str(i)
154 self.assertRaises(ValueError, self.f.writelines, nasty())
155
Antoine Pitroubb445a12010-02-05 17:05:54 +0000156 def testIssue5677(self):
157 # Remark: Do not perform more than one test per open file,
158 # since that does NOT catch the readline error on Windows.
159 data = 'xxx'
160 for mode in ['w', 'wb', 'a', 'ab']:
161 for attr in ['read', 'readline', 'readlines']:
162 self.f = open(TESTFN, mode)
163 self.f.write(data)
164 self.assertRaises(IOError, getattr(self.f, attr))
165 self.f.close()
166
167 self.f = open(TESTFN, mode)
168 self.f.write(data)
169 self.assertRaises(IOError, lambda: [line for line in self.f])
170 self.f.close()
171
172 self.f = open(TESTFN, mode)
173 self.f.write(data)
174 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
175 self.f.close()
176
177 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
178 self.f = open(TESTFN, mode)
179 self.assertRaises(IOError, self.f.write, data)
180 self.f.close()
181
182 self.f = open(TESTFN, mode)
183 self.assertRaises(IOError, self.f.writelines, [data, data])
184 self.f.close()
185
186 self.f = open(TESTFN, mode)
187 self.assertRaises(IOError, self.f.truncate)
188 self.f.close()
189
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000190class OtherFileTests(unittest.TestCase):
191
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000192 def testOpenDir(self):
Hirokazu Yamamotofa647ec2010-09-23 15:59:21 +0000193 this_dir = os.path.dirname(__file__) or os.curdir
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000194 for mode in (None, "w"):
195 try:
196 if mode:
197 f = open(this_dir, mode)
198 else:
199 f = open(this_dir)
200 except IOError as e:
201 self.assertEqual(e.filename, this_dir)
202 else:
203 self.fail("opening a directory didn't raise an IOError")
204
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000205 def testModeStrings(self):
206 # check invalid mode strings
207 for mode in ("", "aU", "wU+"):
208 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000209 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000210 except ValueError:
211 pass
212 else:
213 f.close()
214 self.fail('%r is an invalid file mode' % mode)
215
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000216 # Some invalid modes fail on Windows, but pass on Unix
217 # Issue3965: avoid a crash on Windows when filename is unicode
218 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
219 try:
220 f = open(name, "rr")
221 except (IOError, ValueError):
222 pass
223 else:
224 f.close()
225
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000226 def testStdin(self):
227 # This causes the interpreter to exit on OSF1 v5.1.
228 if sys.platform != 'osf1V5':
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000229 self.assertRaises(IOError, sys.stdin.seek, -1)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000230 else:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000231 print >>sys.__stdout__, (
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000232 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000233 ' Test manually.')
234 self.assertRaises(IOError, sys.stdin.truncate)
235
236 def testUnicodeOpen(self):
237 # verify repr works for unicode too
238 f = open(unicode(TESTFN), "w")
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000239 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000240 f.close()
241 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000242
243 def testBadModeArgument(self):
244 # verify that we get a sensible error message for bad mode argument
245 bad_mode = "qwerty"
246 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000247 f = open(TESTFN, bad_mode)
248 except ValueError, msg:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000249 if msg.args[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000250 s = str(msg)
Ezio Melotti187f93d2010-03-17 14:22:34 +0000251 if TESTFN in s or bad_mode not in s:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000252 self.fail("bad error message for invalid mode: %s" % s)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000253 # if msg.args[0] == 0, we're probably on Windows where there may
254 # be no obvious way to discover why open() failed.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000255 else:
256 f.close()
257 self.fail("no error for invalid mode: %s" % bad_mode)
258
259 def testSetBufferSize(self):
260 # make sure that explicitly setting the buffer size doesn't cause
261 # misbehaviour especially with repeated close() calls
262 for s in (-1, 0, 1, 512):
263 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000264 f = open(TESTFN, 'w', s)
265 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000266 f.close()
267 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000268 f = open(TESTFN, 'r', s)
269 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000270 f.close()
271 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000272 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000273 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
Ezio Melotti2623a372010-11-21 13:34:58 +0000274 self.assertEqual(d, s)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000275
276 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000277 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000278
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000279 def bug801631():
280 # SF bug <http://www.python.org/sf/801631>
281 # "file.truncate fault on windows"
282 f = open(TESTFN, 'wb')
283 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000284 f.close()
285
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000286 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000287 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000288 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000289 self.fail("Read on file opened for update failed %r" % data)
290 if f.tell() != 5:
291 self.fail("File pos after read wrong %d" % f.tell())
292
293 f.truncate()
294 if f.tell() != 5:
295 self.fail("File pos after ftruncate wrong %d" % f.tell())
296
297 f.close()
298 size = os.path.getsize(TESTFN)
299 if size != 5:
300 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000301
302 try:
303 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000304 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000305 os.unlink(TESTFN)
306
307 def testIteration(self):
308 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000309 # various read* methods. Ostensibly, the mixture could just be tested
310 # to work when it should work according to the Python language,
311 # instead of fail when it should fail according to the current CPython
312 # implementation. People don't always program Python the way they
313 # should, though, and the implemenation might change in subtle ways,
314 # so we explicitly test for errors, too; the test will just have to
315 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000316 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000317 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000318 assert not dataoffset % len(filler), \
319 "dataoffset must be multiple of len(filler)"
320 nchunks = dataoffset // len(filler)
321 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000322 "spam, spam and eggs\n",
323 "eggs, spam, ham and spam\n",
324 "saussages, spam, spam and eggs\n",
325 "spam, ham, spam and eggs\n",
326 "spam, spam, spam, spam, spam, ham, spam\n",
327 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000328 ]
329 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000330 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000331
332 try:
333 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000334 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000335 bag.write(filler * nchunks)
336 bag.writelines(testlines)
337 bag.close()
338 # Test for appropriate errors mixing read* and iteration
339 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000340 f = open(TESTFN)
341 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000342 self.fail, "Broken testfile"
343 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000344 try:
345 meth(*args)
346 except ValueError:
347 pass
348 else:
349 self.fail("%s%r after next() didn't raise ValueError" %
350 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000351 f.close()
352
353 # Test to see if harmless (by accident) mixing of read* and
354 # iteration still works. This depends on the size of the internal
355 # iteration buffer (currently 8192,) but we can test it in a
356 # flexible manner. Each line in the bag o' ham is 4 bytes
357 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
358 # exactly on the buffer boundary for any power-of-2 buffersize
359 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000360 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000361 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000362 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000363 testline = testlines.pop(0)
364 try:
365 line = f.readline()
366 except ValueError:
367 self.fail("readline() after next() with supposedly empty "
368 "iteration-buffer failed anyway")
369 if line != testline:
370 self.fail("readline() after next() with empty buffer "
371 "failed. Got %r, expected %r" % (line, testline))
372 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000373 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000374 try:
375 f.readinto(buf)
376 except ValueError:
377 self.fail("readinto() after next() with supposedly empty "
378 "iteration-buffer failed anyway")
379 line = buf.tostring()
380 if line != testline:
381 self.fail("readinto() after next() with empty buffer "
382 "failed. Got %r, expected %r" % (line, testline))
383
384 testline = testlines.pop(0)
385 try:
386 line = f.read(len(testline))
387 except ValueError:
388 self.fail("read() after next() with supposedly empty "
389 "iteration-buffer failed anyway")
390 if line != testline:
391 self.fail("read() after next() with empty buffer "
392 "failed. Got %r, expected %r" % (line, testline))
393 try:
394 lines = f.readlines()
395 except ValueError:
396 self.fail("readlines() after next() with supposedly empty "
397 "iteration-buffer failed anyway")
398 if lines != testlines:
399 self.fail("readlines() after next() with empty buffer "
400 "failed. Got %r, expected %r" % (line, testline))
401 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000402 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000403 try:
404 for line in f:
405 pass
406 try:
407 f.readline()
408 f.readinto(buf)
409 f.read()
410 f.readlines()
411 except ValueError:
412 self.fail("read* failed after next() consumed file")
413 finally:
414 f.close()
415 finally:
416 os.unlink(TESTFN)
417
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200418 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
419 def test_write_full(self):
420 # Issue #17976
Serhiy Storchaka84e7e5f2013-12-17 14:53:32 +0200421 try:
422 f = open('/dev/full', 'w', 1)
423 except IOError:
424 self.skipTest("requires '/dev/full'")
425 try:
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200426 with self.assertRaises(IOError):
427 f.write('hello')
428 f.write('\n')
Serhiy Storchaka84e7e5f2013-12-17 14:53:32 +0200429 finally:
430 f.close()
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200431
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000432class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000433
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000434 def testExit(self):
435 # test that exiting with context calls subclass' close
436 class C(file):
437 def __init__(self, *args):
438 self.subclass_closed = False
439 file.__init__(self, *args)
440 def close(self):
441 self.subclass_closed = True
442 file.close(self)
443
444 with C(TESTFN, 'w') as f:
445 pass
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000446 self.assertTrue(f.subclass_closed)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000447
448
Victor Stinner6a102812010-04-27 23:55:59 +0000449@unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000450class FileThreadingTests(unittest.TestCase):
451 # These tests check the ability to call various methods of file objects
452 # (including close()) concurrently without crashing the Python interpreter.
453 # See #815646, #595601
454
455 def setUp(self):
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000456 self._threads = test_support.threading_setup()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000457 self.f = None
458 self.filename = TESTFN
459 with open(self.filename, "w") as f:
460 f.write("\n".join("0123456789"))
461 self._count_lock = threading.Lock()
462 self.close_count = 0
463 self.close_success_count = 0
Antoine Pitrou83137c22010-05-17 19:56:59 +0000464 self.use_buffering = False
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000465
466 def tearDown(self):
467 if self.f:
468 try:
469 self.f.close()
470 except (EnvironmentError, ValueError):
471 pass
472 try:
473 os.remove(self.filename)
474 except EnvironmentError:
475 pass
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000476 test_support.threading_cleanup(*self._threads)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000477
478 def _create_file(self):
Antoine Pitrou83137c22010-05-17 19:56:59 +0000479 if self.use_buffering:
480 self.f = open(self.filename, "w+", buffering=1024*16)
481 else:
482 self.f = open(self.filename, "w+")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000483
484 def _close_file(self):
485 with self._count_lock:
486 self.close_count += 1
487 self.f.close()
488 with self._count_lock:
489 self.close_success_count += 1
490
491 def _close_and_reopen_file(self):
492 self._close_file()
493 # if close raises an exception thats fine, self.f remains valid so
494 # we don't need to reopen.
495 self._create_file()
496
497 def _run_workers(self, func, nb_workers, duration=0.2):
498 with self._count_lock:
499 self.close_count = 0
500 self.close_success_count = 0
501 self.do_continue = True
502 threads = []
503 try:
504 for i in range(nb_workers):
505 t = threading.Thread(target=func)
506 t.start()
507 threads.append(t)
508 for _ in xrange(100):
509 time.sleep(duration/100)
510 with self._count_lock:
511 if self.close_count-self.close_success_count > nb_workers+1:
512 if test_support.verbose:
513 print 'Q',
514 break
515 time.sleep(duration)
516 finally:
517 self.do_continue = False
518 for t in threads:
519 t.join()
520
521 def _test_close_open_io(self, io_func, nb_workers=5):
522 def worker():
523 self._create_file()
524 funcs = itertools.cycle((
525 lambda: io_func(),
526 lambda: self._close_and_reopen_file(),
527 ))
528 for f in funcs:
529 if not self.do_continue:
530 break
531 try:
532 f()
533 except (IOError, ValueError):
534 pass
535 self._run_workers(worker, nb_workers)
536 if test_support.verbose:
537 # Useful verbose statistics when tuning this test to take
538 # less time to run but still ensuring that its still useful.
539 #
540 # the percent of close calls that raised an error
541 percent = 100. - 100.*self.close_success_count/self.close_count
542 print self.close_count, ('%.4f ' % percent),
543
544 def test_close_open(self):
545 def io_func():
546 pass
547 self._test_close_open_io(io_func)
548
549 def test_close_open_flush(self):
550 def io_func():
551 self.f.flush()
552 self._test_close_open_io(io_func)
553
554 def test_close_open_iter(self):
555 def io_func():
556 list(iter(self.f))
557 self._test_close_open_io(io_func)
558
559 def test_close_open_isatty(self):
560 def io_func():
561 self.f.isatty()
562 self._test_close_open_io(io_func)
563
564 def test_close_open_print(self):
565 def io_func():
566 print >> self.f, ''
567 self._test_close_open_io(io_func)
568
Antoine Pitrou83137c22010-05-17 19:56:59 +0000569 def test_close_open_print_buffered(self):
570 self.use_buffering = True
571 def io_func():
572 print >> self.f, ''
573 self._test_close_open_io(io_func)
574
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000575 def test_close_open_read(self):
576 def io_func():
577 self.f.read(0)
578 self._test_close_open_io(io_func)
579
580 def test_close_open_readinto(self):
581 def io_func():
582 a = array('c', 'xxxxx')
583 self.f.readinto(a)
584 self._test_close_open_io(io_func)
585
586 def test_close_open_readline(self):
587 def io_func():
588 self.f.readline()
589 self._test_close_open_io(io_func)
590
591 def test_close_open_readlines(self):
592 def io_func():
593 self.f.readlines()
594 self._test_close_open_io(io_func)
595
596 def test_close_open_seek(self):
597 def io_func():
598 self.f.seek(0, 0)
599 self._test_close_open_io(io_func)
600
601 def test_close_open_tell(self):
602 def io_func():
603 self.f.tell()
604 self._test_close_open_io(io_func)
605
606 def test_close_open_truncate(self):
607 def io_func():
608 self.f.truncate()
609 self._test_close_open_io(io_func)
610
611 def test_close_open_write(self):
612 def io_func():
613 self.f.write('')
614 self._test_close_open_io(io_func)
615
616 def test_close_open_writelines(self):
617 def io_func():
618 self.f.writelines('')
619 self._test_close_open_io(io_func)
620
621
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700622@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
623class TestFileSignalEINTR(unittest.TestCase):
624 def _test_reading(self, data_to_write, read_and_verify_code, method_name,
625 universal_newlines=False):
626 """Generic buffered read method test harness to verify EINTR behavior.
627
628 Also validates that Python signal handlers are run during the read.
629
630 Args:
631 data_to_write: String to write to the child process for reading
632 before sending it a signal, confirming the signal was handled,
633 writing a final newline char and closing the infile pipe.
634 read_and_verify_code: Single "line" of code to read from a file
635 object named 'infile' and validate the result. This will be
636 executed as part of a python subprocess fed data_to_write.
637 method_name: The name of the read method being tested, for use in
638 an error message on failure.
639 universal_newlines: If True, infile will be opened in universal
640 newline mode in the child process.
641 """
642 if universal_newlines:
643 # Test the \r\n -> \n conversion while we're at it.
644 data_to_write = data_to_write.replace('\n', '\r\n')
645 infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
646 else:
647 infile_setup_code = 'infile = sys.stdin'
648 # Total pipe IO in this function is smaller than the minimum posix OS
649 # pipe buffer size of 512 bytes. No writer should block.
650 assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
651
652 child_code = (
653 'import os, signal, sys ;'
654 'signal.signal('
655 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
656 + infile_setup_code + ' ;' +
657 'assert isinstance(infile, file) ;'
658 'sys.stderr.write("Go.\\n") ;'
659 + read_and_verify_code)
660 reader_process = subprocess.Popen(
661 [sys.executable, '-c', child_code],
662 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
663 stderr=subprocess.PIPE)
664 # Wait for the signal handler to be installed.
665 go = reader_process.stderr.read(4)
666 if go != 'Go.\n':
667 reader_process.kill()
668 self.fail('Error from %s process while awaiting "Go":\n%s' % (
669 method_name, go+reader_process.stderr.read()))
670 reader_process.stdin.write(data_to_write)
671 signals_sent = 0
672 rlist = []
673 # We don't know when the read_and_verify_code in our child is actually
674 # executing within the read system call we want to interrupt. This
675 # loop waits for a bit before sending the first signal to increase
676 # the likelihood of that. Implementations without correct EINTR
677 # and signal handling usually fail this test.
678 while not rlist:
679 rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
680 reader_process.send_signal(signal.SIGINT)
681 # Give the subprocess time to handle it before we loop around and
682 # send another one. On OSX the second signal happening close to
683 # immediately after the first was causing the subprocess to crash
684 # via the OS's default SIGINT handler.
685 time.sleep(0.1)
686 signals_sent += 1
687 if signals_sent > 200:
688 reader_process.kill()
689 self.fail("failed to handle signal during %s." % method_name)
690 # This assumes anything unexpected that writes to stderr will also
691 # write a newline. That is true of the traceback printing code.
692 signal_line = reader_process.stderr.readline()
693 if signal_line != '$\n':
694 reader_process.kill()
695 self.fail('Error from %s process while awaiting signal:\n%s' % (
696 method_name, signal_line+reader_process.stderr.read()))
697 # We append a newline to our input so that a readline call can
698 # end on its own before the EOF is seen.
699 stdout, stderr = reader_process.communicate(input='\n')
700 if reader_process.returncode != 0:
701 self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
702 method_name, reader_process.returncode, stdout, stderr))
703
704 def test_readline(self, universal_newlines=False):
705 """file.readline must handle signals and not lose data."""
706 self._test_reading(
707 data_to_write='hello, world!',
708 read_and_verify_code=(
709 'line = infile.readline() ;'
710 'expected_line = "hello, world!\\n" ;'
711 'assert line == expected_line, ('
712 '"read %r expected %r" % (line, expected_line))'
713 ),
714 method_name='readline',
715 universal_newlines=universal_newlines)
716
717 def test_readline_with_universal_newlines(self):
718 self.test_readline(universal_newlines=True)
719
720 def test_readlines(self, universal_newlines=False):
721 """file.readlines must handle signals and not lose data."""
722 self._test_reading(
723 data_to_write='hello\nworld!',
724 read_and_verify_code=(
725 'lines = infile.readlines() ;'
726 'expected_lines = ["hello\\n", "world!\\n"] ;'
727 'assert lines == expected_lines, ('
728 '"readlines returned wrong data.\\n" '
729 '"got lines %r\\nexpected %r" '
730 '% (lines, expected_lines))'
731 ),
732 method_name='readlines',
733 universal_newlines=universal_newlines)
734
735 def test_readlines_with_universal_newlines(self):
736 self.test_readlines(universal_newlines=True)
737
738 def test_readall(self):
739 """Unbounded file.read() must handle signals and not lose data."""
740 self._test_reading(
741 data_to_write='hello, world!abcdefghijklm',
742 read_and_verify_code=(
743 'data = infile.read() ;'
744 'expected_data = "hello, world!abcdefghijklm\\n";'
745 'assert data == expected_data, ('
746 '"read %r expected %r" % (data, expected_data))'
747 ),
748 method_name='unbounded read')
749
750 def test_readinto(self):
751 """file.readinto must handle signals and not lose data."""
752 self._test_reading(
753 data_to_write='hello, world!',
754 read_and_verify_code=(
755 'data = bytearray(50) ;'
756 'num_read = infile.readinto(data) ;'
757 'expected_data = "hello, world!\\n";'
758 'assert data[:num_read] == expected_data, ('
759 '"read %r expected %r" % (data, expected_data))'
760 ),
761 method_name='readinto')
762
763
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000764class StdoutTests(unittest.TestCase):
765
766 def test_move_stdout_on_write(self):
767 # Issue 3242: sys.stdout can be replaced (and freed) during a
768 # print statement; prevent a segfault in this case
769 save_stdout = sys.stdout
770
771 class File:
772 def write(self, data):
773 if '\n' in data:
774 sys.stdout = save_stdout
775
776 try:
777 sys.stdout = File()
778 print "some text"
779 finally:
780 sys.stdout = save_stdout
781
782 def test_del_stdout_before_print(self):
783 # Issue 4597: 'print' with no argument wasn't reporting when
784 # sys.stdout was deleted.
785 save_stdout = sys.stdout
786 del sys.stdout
787 try:
788 print
789 except RuntimeError as e:
Ezio Melotti2623a372010-11-21 13:34:58 +0000790 self.assertEqual(str(e), "lost sys.stdout")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000791 else:
792 self.fail("Expected RuntimeError")
793 finally:
794 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000795
Victor Stinnercaafd772010-09-08 10:51:01 +0000796 def test_unicode(self):
797 import subprocess
798
799 def get_message(encoding, *code):
800 code = '\n'.join(code)
801 env = os.environ.copy()
802 env['PYTHONIOENCODING'] = encoding
803 process = subprocess.Popen([sys.executable, "-c", code],
804 stdout=subprocess.PIPE, env=env)
805 stdout, stderr = process.communicate()
806 self.assertEqual(process.returncode, 0)
807 return stdout
808
809 def check_message(text, encoding, expected):
810 stdout = get_message(encoding,
811 "import sys",
812 "sys.stdout.write(%r)" % text,
813 "sys.stdout.flush()")
814 self.assertEqual(stdout, expected)
815
Victor Stinner3a68f912010-09-08 11:45:16 +0000816 # test the encoding
817 check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
818 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
819 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
Victor Stinnercaafd772010-09-08 10:51:01 +0000820
Victor Stinner3a68f912010-09-08 11:45:16 +0000821 # test the error handler
822 check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
823 check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
824 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
825
826 # test the buffer API
Victor Stinnercaafd772010-09-08 10:51:01 +0000827 for objtype in ('buffer', 'bytearray'):
828 stdout = get_message('ascii',
829 'import sys',
Victor Stinner3a68f912010-09-08 11:45:16 +0000830 r'sys.stdout.write(%s("\xe9"))' % objtype,
831 'sys.stdout.flush()')
832 self.assertEqual(stdout, "\xe9")
Victor Stinnercaafd772010-09-08 10:51:01 +0000833
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000834
835def test_main():
836 # Historically, these tests have been sloppy about removing TESTFN.
837 # So get rid of it no matter what.
838 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000839 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700840 FileThreadingTests, TestFileSignalEINTR, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000841 finally:
842 if os.path.exists(TESTFN):
843 os.unlink(TESTFN)
844
845if __name__ == '__main__':
846 test_main()