blob: 0c633b4e8fa3fd466545ad6598487d3d3c679926 [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -07005import select
6import signal
7import subprocess
Antoine Pitrou47a5f482009-06-12 20:41:52 +00008import time
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00009from array import array
10from weakref import proxy
Victor Stinner6a102812010-04-27 23:55:59 +000011try:
12 import threading
13except ImportError:
14 threading = None
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000015
Antoine Pitrou47a5f482009-06-12 20:41:52 +000016from test import test_support
Georg Brandla4f46e12010-02-07 17:03:15 +000017from test.test_support import TESTFN, run_unittest
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000018from UserList import UserList
19
20class AutoFileTests(unittest.TestCase):
21 # file tests for which a test file is automatically set up
22
23 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000024 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000025
26 def tearDown(self):
27 if self.f:
28 self.f.close()
29 os.remove(TESTFN)
30
31 def testWeakRefs(self):
32 # verify weak references
33 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000034 p.write('teststring')
Ezio Melotti2623a372010-11-21 13:34:58 +000035 self.assertEqual(self.f.tell(), p.tell())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000036 self.f.close()
37 self.f = None
38 self.assertRaises(ReferenceError, getattr, p, 'tell')
39
40 def testAttributes(self):
41 # verify expected attributes exist
42 f = self.f
Ezio Melottid80b4bf2010-03-17 13:52:48 +000043 with test_support.check_py3k_warnings():
44 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000045 f.name # merely shouldn't blow up
46 f.mode # ditto
47 f.closed # ditto
48
Ezio Melottid80b4bf2010-03-17 13:52:48 +000049 with test_support.check_py3k_warnings():
50 # verify softspace is writable
51 f.softspace = softspace # merely shouldn't blow up
Antoine Pitrou47a5f482009-06-12 20:41:52 +000052
53 # verify the others aren't
54 for attr in 'name', 'mode', 'closed':
55 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
56
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000057 def testReadinto(self):
58 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000059 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000060 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000061 a = array('c', 'x'*10)
62 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000063 n = self.f.readinto(a)
Ezio Melotti2623a372010-11-21 13:34:58 +000064 self.assertEqual('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000065
66 def testWritelinesUserList(self):
67 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000068 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000069 self.f.writelines(l)
70 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000071 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000072 buf = self.f.read()
Ezio Melotti2623a372010-11-21 13:34:58 +000073 self.assertEqual(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000074
75 def testWritelinesIntegers(self):
76 # verify writelines with integers
77 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
78
79 def testWritelinesIntegersUserList(self):
80 # verify writelines with integers in UserList
81 l = UserList([1,2,3])
82 self.assertRaises(TypeError, self.f.writelines, l)
83
84 def testWritelinesNonString(self):
85 # verify writelines with non-string object
86 class NonString:
87 pass
88
89 self.assertRaises(TypeError, self.f.writelines,
90 [NonString(), NonString()])
91
Antoine Pitrou47a5f482009-06-12 20:41:52 +000092 def testRepr(self):
93 # verify repr works
Benjamin Peterson5c8da862009-06-30 22:57:08 +000094 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
Ezio Melotti11f8b682012-03-12 01:17:02 +020095 # see issue #14161
96 # Windows doesn't like \r\n\t" in the file name, but ' is ok
97 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
98 with open(fname, 'w') as f:
99 self.addCleanup(os.remove, fname)
100 self.assertTrue(repr(f).startswith(
101 "<open file %r, mode 'w' at" % fname))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000102
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000103 def testErrors(self):
Antoine Pitroubb445a12010-02-05 17:05:54 +0000104 self.f.close()
105 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000106 f = self.f
Ezio Melotti2623a372010-11-21 13:34:58 +0000107 self.assertEqual(f.name, TESTFN)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000108 self.assertTrue(not f.isatty())
109 self.assertTrue(not f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000110
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000111 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000112 f.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000113 self.assertTrue(f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000114
115 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000116 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
117 'readline', 'readlines', 'seek', 'tell', 'truncate',
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000118 'write', '__iter__']
119 deprecated_methods = ['xreadlines']
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000120 if sys.platform.startswith('atheos'):
121 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000122
123 # __exit__ should close the file
124 self.f.__exit__(None, None, None)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000125 self.assertTrue(self.f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000126
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000127 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000128 method = getattr(self.f, methodname)
129 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000130 self.assertRaises(ValueError, method)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000131 with test_support.check_py3k_warnings():
132 for methodname in deprecated_methods:
133 method = getattr(self.f, methodname)
134 self.assertRaises(ValueError, method)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000135 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000136
137 # file is closed, __exit__ shouldn't do anything
Ezio Melotti2623a372010-11-21 13:34:58 +0000138 self.assertEqual(self.f.__exit__(None, None, None), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000139 # it must also return None if an exception was given
140 try:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000141 1 // 0
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000142 except:
Ezio Melotti2623a372010-11-21 13:34:58 +0000143 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000144
145 def testReadWhenWriting(self):
146 self.assertRaises(IOError, self.f.read)
147
Benjamin Petersonbf775542010-10-16 19:20:12 +0000148 def testNastyWritelinesGenerator(self):
149 def nasty():
150 for i in range(5):
151 if i == 3:
152 self.f.close()
153 yield str(i)
154 self.assertRaises(ValueError, self.f.writelines, nasty())
155
Antoine Pitroubb445a12010-02-05 17:05:54 +0000156 def testIssue5677(self):
157 # Remark: Do not perform more than one test per open file,
158 # since that does NOT catch the readline error on Windows.
159 data = 'xxx'
160 for mode in ['w', 'wb', 'a', 'ab']:
161 for attr in ['read', 'readline', 'readlines']:
162 self.f = open(TESTFN, mode)
163 self.f.write(data)
164 self.assertRaises(IOError, getattr(self.f, attr))
165 self.f.close()
166
167 self.f = open(TESTFN, mode)
168 self.f.write(data)
169 self.assertRaises(IOError, lambda: [line for line in self.f])
170 self.f.close()
171
172 self.f = open(TESTFN, mode)
173 self.f.write(data)
174 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
175 self.f.close()
176
177 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
178 self.f = open(TESTFN, mode)
179 self.assertRaises(IOError, self.f.write, data)
180 self.f.close()
181
182 self.f = open(TESTFN, mode)
183 self.assertRaises(IOError, self.f.writelines, [data, data])
184 self.f.close()
185
186 self.f = open(TESTFN, mode)
187 self.assertRaises(IOError, self.f.truncate)
188 self.f.close()
189
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000190class OtherFileTests(unittest.TestCase):
191
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000192 def testOpenDir(self):
Hirokazu Yamamotofa647ec2010-09-23 15:59:21 +0000193 this_dir = os.path.dirname(__file__) or os.curdir
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000194 for mode in (None, "w"):
195 try:
196 if mode:
197 f = open(this_dir, mode)
198 else:
199 f = open(this_dir)
200 except IOError as e:
201 self.assertEqual(e.filename, this_dir)
202 else:
203 self.fail("opening a directory didn't raise an IOError")
204
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000205 def testModeStrings(self):
206 # check invalid mode strings
207 for mode in ("", "aU", "wU+"):
208 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000209 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000210 except ValueError:
211 pass
212 else:
213 f.close()
214 self.fail('%r is an invalid file mode' % mode)
215
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000216 # Some invalid modes fail on Windows, but pass on Unix
217 # Issue3965: avoid a crash on Windows when filename is unicode
218 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
219 try:
220 f = open(name, "rr")
221 except (IOError, ValueError):
222 pass
223 else:
224 f.close()
225
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000226 def testStdin(self):
227 # This causes the interpreter to exit on OSF1 v5.1.
228 if sys.platform != 'osf1V5':
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000229 self.assertRaises(IOError, sys.stdin.seek, -1)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000230 else:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000231 print >>sys.__stdout__, (
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000232 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000233 ' Test manually.')
234 self.assertRaises(IOError, sys.stdin.truncate)
235
236 def testUnicodeOpen(self):
237 # verify repr works for unicode too
238 f = open(unicode(TESTFN), "w")
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000239 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000240 f.close()
241 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000242
243 def testBadModeArgument(self):
244 # verify that we get a sensible error message for bad mode argument
245 bad_mode = "qwerty"
246 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000247 f = open(TESTFN, bad_mode)
248 except ValueError, msg:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000249 if msg.args[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000250 s = str(msg)
Ezio Melotti187f93d2010-03-17 14:22:34 +0000251 if TESTFN in s or bad_mode not in s:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000252 self.fail("bad error message for invalid mode: %s" % s)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000253 # if msg.args[0] == 0, we're probably on Windows where there may
254 # be no obvious way to discover why open() failed.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000255 else:
256 f.close()
257 self.fail("no error for invalid mode: %s" % bad_mode)
258
259 def testSetBufferSize(self):
260 # make sure that explicitly setting the buffer size doesn't cause
261 # misbehaviour especially with repeated close() calls
262 for s in (-1, 0, 1, 512):
263 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000264 f = open(TESTFN, 'w', s)
265 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000266 f.close()
267 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000268 f = open(TESTFN, 'r', s)
269 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000270 f.close()
271 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000272 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000273 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
Ezio Melotti2623a372010-11-21 13:34:58 +0000274 self.assertEqual(d, s)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000275
276 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000277 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000278
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000279 def bug801631():
280 # SF bug <http://www.python.org/sf/801631>
281 # "file.truncate fault on windows"
282 f = open(TESTFN, 'wb')
283 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000284 f.close()
285
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000286 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000287 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000288 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000289 self.fail("Read on file opened for update failed %r" % data)
290 if f.tell() != 5:
291 self.fail("File pos after read wrong %d" % f.tell())
292
293 f.truncate()
294 if f.tell() != 5:
295 self.fail("File pos after ftruncate wrong %d" % f.tell())
296
297 f.close()
298 size = os.path.getsize(TESTFN)
299 if size != 5:
300 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000301
302 try:
303 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000304 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000305 os.unlink(TESTFN)
306
307 def testIteration(self):
308 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000309 # various read* methods. Ostensibly, the mixture could just be tested
310 # to work when it should work according to the Python language,
311 # instead of fail when it should fail according to the current CPython
312 # implementation. People don't always program Python the way they
313 # should, though, and the implemenation might change in subtle ways,
314 # so we explicitly test for errors, too; the test will just have to
315 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000316 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000317 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000318 assert not dataoffset % len(filler), \
319 "dataoffset must be multiple of len(filler)"
320 nchunks = dataoffset // len(filler)
321 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000322 "spam, spam and eggs\n",
323 "eggs, spam, ham and spam\n",
324 "saussages, spam, spam and eggs\n",
325 "spam, ham, spam and eggs\n",
326 "spam, spam, spam, spam, spam, ham, spam\n",
327 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000328 ]
329 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000330 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000331
332 try:
333 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000334 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000335 bag.write(filler * nchunks)
336 bag.writelines(testlines)
337 bag.close()
338 # Test for appropriate errors mixing read* and iteration
339 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000340 f = open(TESTFN)
341 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000342 self.fail, "Broken testfile"
343 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000344 try:
345 meth(*args)
346 except ValueError:
347 pass
348 else:
349 self.fail("%s%r after next() didn't raise ValueError" %
350 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000351 f.close()
352
353 # Test to see if harmless (by accident) mixing of read* and
354 # iteration still works. This depends on the size of the internal
355 # iteration buffer (currently 8192,) but we can test it in a
356 # flexible manner. Each line in the bag o' ham is 4 bytes
357 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
358 # exactly on the buffer boundary for any power-of-2 buffersize
359 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000360 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000361 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000362 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000363 testline = testlines.pop(0)
364 try:
365 line = f.readline()
366 except ValueError:
367 self.fail("readline() after next() with supposedly empty "
368 "iteration-buffer failed anyway")
369 if line != testline:
370 self.fail("readline() after next() with empty buffer "
371 "failed. Got %r, expected %r" % (line, testline))
372 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000373 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000374 try:
375 f.readinto(buf)
376 except ValueError:
377 self.fail("readinto() after next() with supposedly empty "
378 "iteration-buffer failed anyway")
379 line = buf.tostring()
380 if line != testline:
381 self.fail("readinto() after next() with empty buffer "
382 "failed. Got %r, expected %r" % (line, testline))
383
384 testline = testlines.pop(0)
385 try:
386 line = f.read(len(testline))
387 except ValueError:
388 self.fail("read() after next() with supposedly empty "
389 "iteration-buffer failed anyway")
390 if line != testline:
391 self.fail("read() after next() with empty buffer "
392 "failed. Got %r, expected %r" % (line, testline))
393 try:
394 lines = f.readlines()
395 except ValueError:
396 self.fail("readlines() after next() with supposedly empty "
397 "iteration-buffer failed anyway")
398 if lines != testlines:
399 self.fail("readlines() after next() with empty buffer "
400 "failed. Got %r, expected %r" % (line, testline))
401 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000402 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000403 try:
404 for line in f:
405 pass
406 try:
407 f.readline()
408 f.readinto(buf)
409 f.read()
410 f.readlines()
411 except ValueError:
412 self.fail("read* failed after next() consumed file")
413 finally:
414 f.close()
415 finally:
416 os.unlink(TESTFN)
417
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000418class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000419
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000420 def testExit(self):
421 # test that exiting with context calls subclass' close
422 class C(file):
423 def __init__(self, *args):
424 self.subclass_closed = False
425 file.__init__(self, *args)
426 def close(self):
427 self.subclass_closed = True
428 file.close(self)
429
430 with C(TESTFN, 'w') as f:
431 pass
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000432 self.assertTrue(f.subclass_closed)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000433
434
Victor Stinner6a102812010-04-27 23:55:59 +0000435@unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000436class FileThreadingTests(unittest.TestCase):
437 # These tests check the ability to call various methods of file objects
438 # (including close()) concurrently without crashing the Python interpreter.
439 # See #815646, #595601
440
441 def setUp(self):
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000442 self._threads = test_support.threading_setup()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000443 self.f = None
444 self.filename = TESTFN
445 with open(self.filename, "w") as f:
446 f.write("\n".join("0123456789"))
447 self._count_lock = threading.Lock()
448 self.close_count = 0
449 self.close_success_count = 0
Antoine Pitrou83137c22010-05-17 19:56:59 +0000450 self.use_buffering = False
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000451
452 def tearDown(self):
453 if self.f:
454 try:
455 self.f.close()
456 except (EnvironmentError, ValueError):
457 pass
458 try:
459 os.remove(self.filename)
460 except EnvironmentError:
461 pass
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000462 test_support.threading_cleanup(*self._threads)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000463
464 def _create_file(self):
Antoine Pitrou83137c22010-05-17 19:56:59 +0000465 if self.use_buffering:
466 self.f = open(self.filename, "w+", buffering=1024*16)
467 else:
468 self.f = open(self.filename, "w+")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000469
470 def _close_file(self):
471 with self._count_lock:
472 self.close_count += 1
473 self.f.close()
474 with self._count_lock:
475 self.close_success_count += 1
476
477 def _close_and_reopen_file(self):
478 self._close_file()
479 # if close raises an exception thats fine, self.f remains valid so
480 # we don't need to reopen.
481 self._create_file()
482
483 def _run_workers(self, func, nb_workers, duration=0.2):
484 with self._count_lock:
485 self.close_count = 0
486 self.close_success_count = 0
487 self.do_continue = True
488 threads = []
489 try:
490 for i in range(nb_workers):
491 t = threading.Thread(target=func)
492 t.start()
493 threads.append(t)
494 for _ in xrange(100):
495 time.sleep(duration/100)
496 with self._count_lock:
497 if self.close_count-self.close_success_count > nb_workers+1:
498 if test_support.verbose:
499 print 'Q',
500 break
501 time.sleep(duration)
502 finally:
503 self.do_continue = False
504 for t in threads:
505 t.join()
506
507 def _test_close_open_io(self, io_func, nb_workers=5):
508 def worker():
509 self._create_file()
510 funcs = itertools.cycle((
511 lambda: io_func(),
512 lambda: self._close_and_reopen_file(),
513 ))
514 for f in funcs:
515 if not self.do_continue:
516 break
517 try:
518 f()
519 except (IOError, ValueError):
520 pass
521 self._run_workers(worker, nb_workers)
522 if test_support.verbose:
523 # Useful verbose statistics when tuning this test to take
524 # less time to run but still ensuring that its still useful.
525 #
526 # the percent of close calls that raised an error
527 percent = 100. - 100.*self.close_success_count/self.close_count
528 print self.close_count, ('%.4f ' % percent),
529
530 def test_close_open(self):
531 def io_func():
532 pass
533 self._test_close_open_io(io_func)
534
535 def test_close_open_flush(self):
536 def io_func():
537 self.f.flush()
538 self._test_close_open_io(io_func)
539
540 def test_close_open_iter(self):
541 def io_func():
542 list(iter(self.f))
543 self._test_close_open_io(io_func)
544
545 def test_close_open_isatty(self):
546 def io_func():
547 self.f.isatty()
548 self._test_close_open_io(io_func)
549
550 def test_close_open_print(self):
551 def io_func():
552 print >> self.f, ''
553 self._test_close_open_io(io_func)
554
Antoine Pitrou83137c22010-05-17 19:56:59 +0000555 def test_close_open_print_buffered(self):
556 self.use_buffering = True
557 def io_func():
558 print >> self.f, ''
559 self._test_close_open_io(io_func)
560
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000561 def test_close_open_read(self):
562 def io_func():
563 self.f.read(0)
564 self._test_close_open_io(io_func)
565
566 def test_close_open_readinto(self):
567 def io_func():
568 a = array('c', 'xxxxx')
569 self.f.readinto(a)
570 self._test_close_open_io(io_func)
571
572 def test_close_open_readline(self):
573 def io_func():
574 self.f.readline()
575 self._test_close_open_io(io_func)
576
577 def test_close_open_readlines(self):
578 def io_func():
579 self.f.readlines()
580 self._test_close_open_io(io_func)
581
582 def test_close_open_seek(self):
583 def io_func():
584 self.f.seek(0, 0)
585 self._test_close_open_io(io_func)
586
587 def test_close_open_tell(self):
588 def io_func():
589 self.f.tell()
590 self._test_close_open_io(io_func)
591
592 def test_close_open_truncate(self):
593 def io_func():
594 self.f.truncate()
595 self._test_close_open_io(io_func)
596
597 def test_close_open_write(self):
598 def io_func():
599 self.f.write('')
600 self._test_close_open_io(io_func)
601
602 def test_close_open_writelines(self):
603 def io_func():
604 self.f.writelines('')
605 self._test_close_open_io(io_func)
606
607
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700608@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
609class TestFileSignalEINTR(unittest.TestCase):
610 def _test_reading(self, data_to_write, read_and_verify_code, method_name,
611 universal_newlines=False):
612 """Generic buffered read method test harness to verify EINTR behavior.
613
614 Also validates that Python signal handlers are run during the read.
615
616 Args:
617 data_to_write: String to write to the child process for reading
618 before sending it a signal, confirming the signal was handled,
619 writing a final newline char and closing the infile pipe.
620 read_and_verify_code: Single "line" of code to read from a file
621 object named 'infile' and validate the result. This will be
622 executed as part of a python subprocess fed data_to_write.
623 method_name: The name of the read method being tested, for use in
624 an error message on failure.
625 universal_newlines: If True, infile will be opened in universal
626 newline mode in the child process.
627 """
628 if universal_newlines:
629 # Test the \r\n -> \n conversion while we're at it.
630 data_to_write = data_to_write.replace('\n', '\r\n')
631 infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
632 else:
633 infile_setup_code = 'infile = sys.stdin'
634 # Total pipe IO in this function is smaller than the minimum posix OS
635 # pipe buffer size of 512 bytes. No writer should block.
636 assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
637
638 child_code = (
639 'import os, signal, sys ;'
640 'signal.signal('
641 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
642 + infile_setup_code + ' ;' +
643 'assert isinstance(infile, file) ;'
644 'sys.stderr.write("Go.\\n") ;'
645 + read_and_verify_code)
646 reader_process = subprocess.Popen(
647 [sys.executable, '-c', child_code],
648 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
649 stderr=subprocess.PIPE)
650 # Wait for the signal handler to be installed.
651 go = reader_process.stderr.read(4)
652 if go != 'Go.\n':
653 reader_process.kill()
654 self.fail('Error from %s process while awaiting "Go":\n%s' % (
655 method_name, go+reader_process.stderr.read()))
656 reader_process.stdin.write(data_to_write)
657 signals_sent = 0
658 rlist = []
659 # We don't know when the read_and_verify_code in our child is actually
660 # executing within the read system call we want to interrupt. This
661 # loop waits for a bit before sending the first signal to increase
662 # the likelihood of that. Implementations without correct EINTR
663 # and signal handling usually fail this test.
664 while not rlist:
665 rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
666 reader_process.send_signal(signal.SIGINT)
667 # Give the subprocess time to handle it before we loop around and
668 # send another one. On OSX the second signal happening close to
669 # immediately after the first was causing the subprocess to crash
670 # via the OS's default SIGINT handler.
671 time.sleep(0.1)
672 signals_sent += 1
673 if signals_sent > 200:
674 reader_process.kill()
675 self.fail("failed to handle signal during %s." % method_name)
676 # This assumes anything unexpected that writes to stderr will also
677 # write a newline. That is true of the traceback printing code.
678 signal_line = reader_process.stderr.readline()
679 if signal_line != '$\n':
680 reader_process.kill()
681 self.fail('Error from %s process while awaiting signal:\n%s' % (
682 method_name, signal_line+reader_process.stderr.read()))
683 # We append a newline to our input so that a readline call can
684 # end on its own before the EOF is seen.
685 stdout, stderr = reader_process.communicate(input='\n')
686 if reader_process.returncode != 0:
687 self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
688 method_name, reader_process.returncode, stdout, stderr))
689
690 def test_readline(self, universal_newlines=False):
691 """file.readline must handle signals and not lose data."""
692 self._test_reading(
693 data_to_write='hello, world!',
694 read_and_verify_code=(
695 'line = infile.readline() ;'
696 'expected_line = "hello, world!\\n" ;'
697 'assert line == expected_line, ('
698 '"read %r expected %r" % (line, expected_line))'
699 ),
700 method_name='readline',
701 universal_newlines=universal_newlines)
702
703 def test_readline_with_universal_newlines(self):
704 self.test_readline(universal_newlines=True)
705
706 def test_readlines(self, universal_newlines=False):
707 """file.readlines must handle signals and not lose data."""
708 self._test_reading(
709 data_to_write='hello\nworld!',
710 read_and_verify_code=(
711 'lines = infile.readlines() ;'
712 'expected_lines = ["hello\\n", "world!\\n"] ;'
713 'assert lines == expected_lines, ('
714 '"readlines returned wrong data.\\n" '
715 '"got lines %r\\nexpected %r" '
716 '% (lines, expected_lines))'
717 ),
718 method_name='readlines',
719 universal_newlines=universal_newlines)
720
721 def test_readlines_with_universal_newlines(self):
722 self.test_readlines(universal_newlines=True)
723
724 def test_readall(self):
725 """Unbounded file.read() must handle signals and not lose data."""
726 self._test_reading(
727 data_to_write='hello, world!abcdefghijklm',
728 read_and_verify_code=(
729 'data = infile.read() ;'
730 'expected_data = "hello, world!abcdefghijklm\\n";'
731 'assert data == expected_data, ('
732 '"read %r expected %r" % (data, expected_data))'
733 ),
734 method_name='unbounded read')
735
736 def test_readinto(self):
737 """file.readinto must handle signals and not lose data."""
738 self._test_reading(
739 data_to_write='hello, world!',
740 read_and_verify_code=(
741 'data = bytearray(50) ;'
742 'num_read = infile.readinto(data) ;'
743 'expected_data = "hello, world!\\n";'
744 'assert data[:num_read] == expected_data, ('
745 '"read %r expected %r" % (data, expected_data))'
746 ),
747 method_name='readinto')
748
749
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000750class StdoutTests(unittest.TestCase):
751
752 def test_move_stdout_on_write(self):
753 # Issue 3242: sys.stdout can be replaced (and freed) during a
754 # print statement; prevent a segfault in this case
755 save_stdout = sys.stdout
756
757 class File:
758 def write(self, data):
759 if '\n' in data:
760 sys.stdout = save_stdout
761
762 try:
763 sys.stdout = File()
764 print "some text"
765 finally:
766 sys.stdout = save_stdout
767
768 def test_del_stdout_before_print(self):
769 # Issue 4597: 'print' with no argument wasn't reporting when
770 # sys.stdout was deleted.
771 save_stdout = sys.stdout
772 del sys.stdout
773 try:
774 print
775 except RuntimeError as e:
Ezio Melotti2623a372010-11-21 13:34:58 +0000776 self.assertEqual(str(e), "lost sys.stdout")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000777 else:
778 self.fail("Expected RuntimeError")
779 finally:
780 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000781
Victor Stinnercaafd772010-09-08 10:51:01 +0000782 def test_unicode(self):
783 import subprocess
784
785 def get_message(encoding, *code):
786 code = '\n'.join(code)
787 env = os.environ.copy()
788 env['PYTHONIOENCODING'] = encoding
789 process = subprocess.Popen([sys.executable, "-c", code],
790 stdout=subprocess.PIPE, env=env)
791 stdout, stderr = process.communicate()
792 self.assertEqual(process.returncode, 0)
793 return stdout
794
795 def check_message(text, encoding, expected):
796 stdout = get_message(encoding,
797 "import sys",
798 "sys.stdout.write(%r)" % text,
799 "sys.stdout.flush()")
800 self.assertEqual(stdout, expected)
801
Victor Stinner3a68f912010-09-08 11:45:16 +0000802 # test the encoding
803 check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
804 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
805 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
Victor Stinnercaafd772010-09-08 10:51:01 +0000806
Victor Stinner3a68f912010-09-08 11:45:16 +0000807 # test the error handler
808 check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
809 check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
810 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
811
812 # test the buffer API
Victor Stinnercaafd772010-09-08 10:51:01 +0000813 for objtype in ('buffer', 'bytearray'):
814 stdout = get_message('ascii',
815 'import sys',
Victor Stinner3a68f912010-09-08 11:45:16 +0000816 r'sys.stdout.write(%s("\xe9"))' % objtype,
817 'sys.stdout.flush()')
818 self.assertEqual(stdout, "\xe9")
Victor Stinnercaafd772010-09-08 10:51:01 +0000819
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000820
821def test_main():
822 # Historically, these tests have been sloppy about removing TESTFN.
823 # So get rid of it no matter what.
824 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000825 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700826 FileThreadingTests, TestFileSignalEINTR, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000827 finally:
828 if os.path.exists(TESTFN):
829 os.unlink(TESTFN)
830
831if __name__ == '__main__':
832 test_main()