blob: fae1db6acc2eda5dd63ca82d075ede261d30c1de [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -07005import select
6import signal
7import subprocess
Antoine Pitrou47a5f482009-06-12 20:41:52 +00008import time
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00009from array import array
10from weakref import proxy
Victor Stinner6a102812010-04-27 23:55:59 +000011try:
12 import threading
13except ImportError:
14 threading = None
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000015
Antoine Pitrou47a5f482009-06-12 20:41:52 +000016from test import test_support
Georg Brandla4f46e12010-02-07 17:03:15 +000017from test.test_support import TESTFN, run_unittest
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000018from UserList import UserList
19
20class AutoFileTests(unittest.TestCase):
21 # file tests for which a test file is automatically set up
22
23 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000024 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000025
26 def tearDown(self):
27 if self.f:
28 self.f.close()
29 os.remove(TESTFN)
30
31 def testWeakRefs(self):
32 # verify weak references
33 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000034 p.write('teststring')
Ezio Melotti2623a372010-11-21 13:34:58 +000035 self.assertEqual(self.f.tell(), p.tell())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000036 self.f.close()
37 self.f = None
38 self.assertRaises(ReferenceError, getattr, p, 'tell')
39
40 def testAttributes(self):
41 # verify expected attributes exist
42 f = self.f
Ezio Melottid80b4bf2010-03-17 13:52:48 +000043 with test_support.check_py3k_warnings():
44 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000045 f.name # merely shouldn't blow up
46 f.mode # ditto
47 f.closed # ditto
48
Ezio Melottid80b4bf2010-03-17 13:52:48 +000049 with test_support.check_py3k_warnings():
50 # verify softspace is writable
51 f.softspace = softspace # merely shouldn't blow up
Antoine Pitrou47a5f482009-06-12 20:41:52 +000052
53 # verify the others aren't
54 for attr in 'name', 'mode', 'closed':
55 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
56
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000057 def testReadinto(self):
58 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000059 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000060 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000061 a = array('c', 'x'*10)
62 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000063 n = self.f.readinto(a)
Ezio Melotti2623a372010-11-21 13:34:58 +000064 self.assertEqual('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000065
66 def testWritelinesUserList(self):
67 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000068 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000069 self.f.writelines(l)
70 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000071 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000072 buf = self.f.read()
Ezio Melotti2623a372010-11-21 13:34:58 +000073 self.assertEqual(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000074
75 def testWritelinesIntegers(self):
76 # verify writelines with integers
77 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
78
79 def testWritelinesIntegersUserList(self):
80 # verify writelines with integers in UserList
81 l = UserList([1,2,3])
82 self.assertRaises(TypeError, self.f.writelines, l)
83
84 def testWritelinesNonString(self):
85 # verify writelines with non-string object
86 class NonString:
87 pass
88
89 self.assertRaises(TypeError, self.f.writelines,
90 [NonString(), NonString()])
91
Antoine Pitroub0acc1b2014-05-08 19:26:04 +020092 def testWritelinesBuffer(self):
93 self.f.writelines([array('c', 'abc')])
94 self.f.close()
95 self.f = open(TESTFN, 'rb')
96 buf = self.f.read()
97 self.assertEqual(buf, 'abc')
98
Antoine Pitrou47a5f482009-06-12 20:41:52 +000099 def testRepr(self):
100 # verify repr works
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000101 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
Ezio Melotti11f8b682012-03-12 01:17:02 +0200102 # see issue #14161
103 # Windows doesn't like \r\n\t" in the file name, but ' is ok
104 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
105 with open(fname, 'w') as f:
106 self.addCleanup(os.remove, fname)
107 self.assertTrue(repr(f).startswith(
108 "<open file %r, mode 'w' at" % fname))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000109
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000110 def testErrors(self):
Antoine Pitroubb445a12010-02-05 17:05:54 +0000111 self.f.close()
112 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000113 f = self.f
Ezio Melotti2623a372010-11-21 13:34:58 +0000114 self.assertEqual(f.name, TESTFN)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000115 self.assertTrue(not f.isatty())
116 self.assertTrue(not f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000117
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000118 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000119 f.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000120 self.assertTrue(f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000121
122 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000123 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
124 'readline', 'readlines', 'seek', 'tell', 'truncate',
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000125 'write', '__iter__']
126 deprecated_methods = ['xreadlines']
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000127 if sys.platform.startswith('atheos'):
128 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000129
130 # __exit__ should close the file
131 self.f.__exit__(None, None, None)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000132 self.assertTrue(self.f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000133
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000134 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000135 method = getattr(self.f, methodname)
136 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000137 self.assertRaises(ValueError, method)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000138 with test_support.check_py3k_warnings():
139 for methodname in deprecated_methods:
140 method = getattr(self.f, methodname)
141 self.assertRaises(ValueError, method)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000142 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000143
144 # file is closed, __exit__ shouldn't do anything
Ezio Melotti2623a372010-11-21 13:34:58 +0000145 self.assertEqual(self.f.__exit__(None, None, None), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000146 # it must also return None if an exception was given
147 try:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000148 1 // 0
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000149 except:
Ezio Melotti2623a372010-11-21 13:34:58 +0000150 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000151
152 def testReadWhenWriting(self):
153 self.assertRaises(IOError, self.f.read)
154
Benjamin Petersonbf775542010-10-16 19:20:12 +0000155 def testNastyWritelinesGenerator(self):
156 def nasty():
157 for i in range(5):
158 if i == 3:
159 self.f.close()
160 yield str(i)
161 self.assertRaises(ValueError, self.f.writelines, nasty())
162
Antoine Pitroubb445a12010-02-05 17:05:54 +0000163 def testIssue5677(self):
164 # Remark: Do not perform more than one test per open file,
165 # since that does NOT catch the readline error on Windows.
166 data = 'xxx'
167 for mode in ['w', 'wb', 'a', 'ab']:
168 for attr in ['read', 'readline', 'readlines']:
169 self.f = open(TESTFN, mode)
170 self.f.write(data)
171 self.assertRaises(IOError, getattr(self.f, attr))
172 self.f.close()
173
174 self.f = open(TESTFN, mode)
175 self.f.write(data)
176 self.assertRaises(IOError, lambda: [line for line in self.f])
177 self.f.close()
178
179 self.f = open(TESTFN, mode)
180 self.f.write(data)
181 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
182 self.f.close()
183
184 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
185 self.f = open(TESTFN, mode)
186 self.assertRaises(IOError, self.f.write, data)
187 self.f.close()
188
189 self.f = open(TESTFN, mode)
190 self.assertRaises(IOError, self.f.writelines, [data, data])
191 self.f.close()
192
193 self.f = open(TESTFN, mode)
194 self.assertRaises(IOError, self.f.truncate)
195 self.f.close()
196
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000197class OtherFileTests(unittest.TestCase):
198
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000199 def testOpenDir(self):
Hirokazu Yamamotofa647ec2010-09-23 15:59:21 +0000200 this_dir = os.path.dirname(__file__) or os.curdir
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000201 for mode in (None, "w"):
202 try:
203 if mode:
204 f = open(this_dir, mode)
205 else:
206 f = open(this_dir)
207 except IOError as e:
208 self.assertEqual(e.filename, this_dir)
209 else:
210 self.fail("opening a directory didn't raise an IOError")
211
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000212 def testModeStrings(self):
213 # check invalid mode strings
214 for mode in ("", "aU", "wU+"):
215 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000216 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000217 except ValueError:
218 pass
219 else:
220 f.close()
221 self.fail('%r is an invalid file mode' % mode)
222
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000223 # Some invalid modes fail on Windows, but pass on Unix
224 # Issue3965: avoid a crash on Windows when filename is unicode
225 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
226 try:
227 f = open(name, "rr")
228 except (IOError, ValueError):
229 pass
230 else:
231 f.close()
232
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000233 def testStdin(self):
234 # This causes the interpreter to exit on OSF1 v5.1.
235 if sys.platform != 'osf1V5':
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000236 self.assertRaises(IOError, sys.stdin.seek, -1)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000237 else:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000238 print >>sys.__stdout__, (
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000239 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000240 ' Test manually.')
241 self.assertRaises(IOError, sys.stdin.truncate)
242
243 def testUnicodeOpen(self):
244 # verify repr works for unicode too
245 f = open(unicode(TESTFN), "w")
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000246 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000247 f.close()
248 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000249
250 def testBadModeArgument(self):
251 # verify that we get a sensible error message for bad mode argument
252 bad_mode = "qwerty"
253 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000254 f = open(TESTFN, bad_mode)
255 except ValueError, msg:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000256 if msg.args[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000257 s = str(msg)
Ezio Melotti187f93d2010-03-17 14:22:34 +0000258 if TESTFN in s or bad_mode not in s:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000259 self.fail("bad error message for invalid mode: %s" % s)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000260 # if msg.args[0] == 0, we're probably on Windows where there may
261 # be no obvious way to discover why open() failed.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000262 else:
263 f.close()
264 self.fail("no error for invalid mode: %s" % bad_mode)
265
266 def testSetBufferSize(self):
267 # make sure that explicitly setting the buffer size doesn't cause
268 # misbehaviour especially with repeated close() calls
269 for s in (-1, 0, 1, 512):
270 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000271 f = open(TESTFN, 'w', s)
272 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000273 f.close()
274 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000275 f = open(TESTFN, 'r', s)
276 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000277 f.close()
278 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000279 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000280 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
Ezio Melotti2623a372010-11-21 13:34:58 +0000281 self.assertEqual(d, s)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000282
283 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000284 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000285
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000286 def bug801631():
287 # SF bug <http://www.python.org/sf/801631>
288 # "file.truncate fault on windows"
289 f = open(TESTFN, 'wb')
290 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000291 f.close()
292
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000293 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000294 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000295 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000296 self.fail("Read on file opened for update failed %r" % data)
297 if f.tell() != 5:
298 self.fail("File pos after read wrong %d" % f.tell())
299
300 f.truncate()
301 if f.tell() != 5:
302 self.fail("File pos after ftruncate wrong %d" % f.tell())
303
304 f.close()
305 size = os.path.getsize(TESTFN)
306 if size != 5:
307 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000308
309 try:
310 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000311 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000312 os.unlink(TESTFN)
313
314 def testIteration(self):
315 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000316 # various read* methods. Ostensibly, the mixture could just be tested
317 # to work when it should work according to the Python language,
318 # instead of fail when it should fail according to the current CPython
319 # implementation. People don't always program Python the way they
320 # should, though, and the implemenation might change in subtle ways,
321 # so we explicitly test for errors, too; the test will just have to
322 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000323 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000324 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000325 assert not dataoffset % len(filler), \
326 "dataoffset must be multiple of len(filler)"
327 nchunks = dataoffset // len(filler)
328 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000329 "spam, spam and eggs\n",
330 "eggs, spam, ham and spam\n",
331 "saussages, spam, spam and eggs\n",
332 "spam, ham, spam and eggs\n",
333 "spam, spam, spam, spam, spam, ham, spam\n",
334 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000335 ]
336 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000337 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000338
339 try:
340 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000341 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000342 bag.write(filler * nchunks)
343 bag.writelines(testlines)
344 bag.close()
345 # Test for appropriate errors mixing read* and iteration
346 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000347 f = open(TESTFN)
348 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000349 self.fail, "Broken testfile"
350 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000351 try:
352 meth(*args)
353 except ValueError:
354 pass
355 else:
356 self.fail("%s%r after next() didn't raise ValueError" %
357 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000358 f.close()
359
360 # Test to see if harmless (by accident) mixing of read* and
361 # iteration still works. This depends on the size of the internal
362 # iteration buffer (currently 8192,) but we can test it in a
363 # flexible manner. Each line in the bag o' ham is 4 bytes
364 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
365 # exactly on the buffer boundary for any power-of-2 buffersize
366 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000367 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000368 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000369 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000370 testline = testlines.pop(0)
371 try:
372 line = f.readline()
373 except ValueError:
374 self.fail("readline() after next() with supposedly empty "
375 "iteration-buffer failed anyway")
376 if line != testline:
377 self.fail("readline() after next() with empty buffer "
378 "failed. Got %r, expected %r" % (line, testline))
379 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000380 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000381 try:
382 f.readinto(buf)
383 except ValueError:
384 self.fail("readinto() after next() with supposedly empty "
385 "iteration-buffer failed anyway")
386 line = buf.tostring()
387 if line != testline:
388 self.fail("readinto() after next() with empty buffer "
389 "failed. Got %r, expected %r" % (line, testline))
390
391 testline = testlines.pop(0)
392 try:
393 line = f.read(len(testline))
394 except ValueError:
395 self.fail("read() after next() with supposedly empty "
396 "iteration-buffer failed anyway")
397 if line != testline:
398 self.fail("read() after next() with empty buffer "
399 "failed. Got %r, expected %r" % (line, testline))
400 try:
401 lines = f.readlines()
402 except ValueError:
403 self.fail("readlines() after next() with supposedly empty "
404 "iteration-buffer failed anyway")
405 if lines != testlines:
406 self.fail("readlines() after next() with empty buffer "
407 "failed. Got %r, expected %r" % (line, testline))
408 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000409 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000410 try:
411 for line in f:
412 pass
413 try:
414 f.readline()
415 f.readinto(buf)
416 f.read()
417 f.readlines()
418 except ValueError:
419 self.fail("read* failed after next() consumed file")
420 finally:
421 f.close()
422 finally:
423 os.unlink(TESTFN)
424
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200425 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
426 def test_write_full(self):
427 # Issue #17976
Serhiy Storchaka84e7e5f2013-12-17 14:53:32 +0200428 try:
429 f = open('/dev/full', 'w', 1)
430 except IOError:
431 self.skipTest("requires '/dev/full'")
432 try:
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200433 with self.assertRaises(IOError):
434 f.write('hello')
435 f.write('\n')
Serhiy Storchaka84e7e5f2013-12-17 14:53:32 +0200436 finally:
437 f.close()
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200438
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000439class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000440
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000441 def testExit(self):
442 # test that exiting with context calls subclass' close
443 class C(file):
444 def __init__(self, *args):
445 self.subclass_closed = False
446 file.__init__(self, *args)
447 def close(self):
448 self.subclass_closed = True
449 file.close(self)
450
451 with C(TESTFN, 'w') as f:
452 pass
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000453 self.assertTrue(f.subclass_closed)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000454
455
Victor Stinner6a102812010-04-27 23:55:59 +0000456@unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000457class FileThreadingTests(unittest.TestCase):
458 # These tests check the ability to call various methods of file objects
459 # (including close()) concurrently without crashing the Python interpreter.
460 # See #815646, #595601
461
462 def setUp(self):
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000463 self._threads = test_support.threading_setup()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000464 self.f = None
465 self.filename = TESTFN
466 with open(self.filename, "w") as f:
467 f.write("\n".join("0123456789"))
468 self._count_lock = threading.Lock()
469 self.close_count = 0
470 self.close_success_count = 0
Antoine Pitrou83137c22010-05-17 19:56:59 +0000471 self.use_buffering = False
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000472
473 def tearDown(self):
474 if self.f:
475 try:
476 self.f.close()
477 except (EnvironmentError, ValueError):
478 pass
479 try:
480 os.remove(self.filename)
481 except EnvironmentError:
482 pass
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000483 test_support.threading_cleanup(*self._threads)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000484
485 def _create_file(self):
Antoine Pitrou83137c22010-05-17 19:56:59 +0000486 if self.use_buffering:
487 self.f = open(self.filename, "w+", buffering=1024*16)
488 else:
489 self.f = open(self.filename, "w+")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000490
491 def _close_file(self):
492 with self._count_lock:
493 self.close_count += 1
494 self.f.close()
495 with self._count_lock:
496 self.close_success_count += 1
497
498 def _close_and_reopen_file(self):
499 self._close_file()
500 # if close raises an exception thats fine, self.f remains valid so
501 # we don't need to reopen.
502 self._create_file()
503
504 def _run_workers(self, func, nb_workers, duration=0.2):
505 with self._count_lock:
506 self.close_count = 0
507 self.close_success_count = 0
508 self.do_continue = True
509 threads = []
510 try:
511 for i in range(nb_workers):
512 t = threading.Thread(target=func)
513 t.start()
514 threads.append(t)
515 for _ in xrange(100):
516 time.sleep(duration/100)
517 with self._count_lock:
518 if self.close_count-self.close_success_count > nb_workers+1:
519 if test_support.verbose:
520 print 'Q',
521 break
522 time.sleep(duration)
523 finally:
524 self.do_continue = False
525 for t in threads:
526 t.join()
527
528 def _test_close_open_io(self, io_func, nb_workers=5):
529 def worker():
530 self._create_file()
531 funcs = itertools.cycle((
532 lambda: io_func(),
533 lambda: self._close_and_reopen_file(),
534 ))
535 for f in funcs:
536 if not self.do_continue:
537 break
538 try:
539 f()
540 except (IOError, ValueError):
541 pass
542 self._run_workers(worker, nb_workers)
543 if test_support.verbose:
544 # Useful verbose statistics when tuning this test to take
545 # less time to run but still ensuring that its still useful.
546 #
547 # the percent of close calls that raised an error
548 percent = 100. - 100.*self.close_success_count/self.close_count
549 print self.close_count, ('%.4f ' % percent),
550
551 def test_close_open(self):
552 def io_func():
553 pass
554 self._test_close_open_io(io_func)
555
556 def test_close_open_flush(self):
557 def io_func():
558 self.f.flush()
559 self._test_close_open_io(io_func)
560
561 def test_close_open_iter(self):
562 def io_func():
563 list(iter(self.f))
564 self._test_close_open_io(io_func)
565
566 def test_close_open_isatty(self):
567 def io_func():
568 self.f.isatty()
569 self._test_close_open_io(io_func)
570
571 def test_close_open_print(self):
572 def io_func():
573 print >> self.f, ''
574 self._test_close_open_io(io_func)
575
Antoine Pitrou83137c22010-05-17 19:56:59 +0000576 def test_close_open_print_buffered(self):
577 self.use_buffering = True
578 def io_func():
579 print >> self.f, ''
580 self._test_close_open_io(io_func)
581
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000582 def test_close_open_read(self):
583 def io_func():
584 self.f.read(0)
585 self._test_close_open_io(io_func)
586
587 def test_close_open_readinto(self):
588 def io_func():
589 a = array('c', 'xxxxx')
590 self.f.readinto(a)
591 self._test_close_open_io(io_func)
592
593 def test_close_open_readline(self):
594 def io_func():
595 self.f.readline()
596 self._test_close_open_io(io_func)
597
598 def test_close_open_readlines(self):
599 def io_func():
600 self.f.readlines()
601 self._test_close_open_io(io_func)
602
603 def test_close_open_seek(self):
604 def io_func():
605 self.f.seek(0, 0)
606 self._test_close_open_io(io_func)
607
608 def test_close_open_tell(self):
609 def io_func():
610 self.f.tell()
611 self._test_close_open_io(io_func)
612
613 def test_close_open_truncate(self):
614 def io_func():
615 self.f.truncate()
616 self._test_close_open_io(io_func)
617
618 def test_close_open_write(self):
619 def io_func():
620 self.f.write('')
621 self._test_close_open_io(io_func)
622
623 def test_close_open_writelines(self):
624 def io_func():
625 self.f.writelines('')
626 self._test_close_open_io(io_func)
627
628
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700629@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
630class TestFileSignalEINTR(unittest.TestCase):
631 def _test_reading(self, data_to_write, read_and_verify_code, method_name,
632 universal_newlines=False):
633 """Generic buffered read method test harness to verify EINTR behavior.
634
635 Also validates that Python signal handlers are run during the read.
636
637 Args:
638 data_to_write: String to write to the child process for reading
639 before sending it a signal, confirming the signal was handled,
640 writing a final newline char and closing the infile pipe.
641 read_and_verify_code: Single "line" of code to read from a file
642 object named 'infile' and validate the result. This will be
643 executed as part of a python subprocess fed data_to_write.
644 method_name: The name of the read method being tested, for use in
645 an error message on failure.
646 universal_newlines: If True, infile will be opened in universal
647 newline mode in the child process.
648 """
649 if universal_newlines:
650 # Test the \r\n -> \n conversion while we're at it.
651 data_to_write = data_to_write.replace('\n', '\r\n')
652 infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
653 else:
654 infile_setup_code = 'infile = sys.stdin'
655 # Total pipe IO in this function is smaller than the minimum posix OS
656 # pipe buffer size of 512 bytes. No writer should block.
657 assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
658
659 child_code = (
660 'import os, signal, sys ;'
661 'signal.signal('
662 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
663 + infile_setup_code + ' ;' +
664 'assert isinstance(infile, file) ;'
665 'sys.stderr.write("Go.\\n") ;'
666 + read_and_verify_code)
667 reader_process = subprocess.Popen(
668 [sys.executable, '-c', child_code],
669 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
670 stderr=subprocess.PIPE)
671 # Wait for the signal handler to be installed.
672 go = reader_process.stderr.read(4)
673 if go != 'Go.\n':
674 reader_process.kill()
675 self.fail('Error from %s process while awaiting "Go":\n%s' % (
676 method_name, go+reader_process.stderr.read()))
677 reader_process.stdin.write(data_to_write)
678 signals_sent = 0
679 rlist = []
680 # We don't know when the read_and_verify_code in our child is actually
681 # executing within the read system call we want to interrupt. This
682 # loop waits for a bit before sending the first signal to increase
683 # the likelihood of that. Implementations without correct EINTR
684 # and signal handling usually fail this test.
685 while not rlist:
686 rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
687 reader_process.send_signal(signal.SIGINT)
688 # Give the subprocess time to handle it before we loop around and
689 # send another one. On OSX the second signal happening close to
690 # immediately after the first was causing the subprocess to crash
691 # via the OS's default SIGINT handler.
692 time.sleep(0.1)
693 signals_sent += 1
694 if signals_sent > 200:
695 reader_process.kill()
696 self.fail("failed to handle signal during %s." % method_name)
697 # This assumes anything unexpected that writes to stderr will also
698 # write a newline. That is true of the traceback printing code.
699 signal_line = reader_process.stderr.readline()
700 if signal_line != '$\n':
701 reader_process.kill()
702 self.fail('Error from %s process while awaiting signal:\n%s' % (
703 method_name, signal_line+reader_process.stderr.read()))
704 # We append a newline to our input so that a readline call can
705 # end on its own before the EOF is seen.
706 stdout, stderr = reader_process.communicate(input='\n')
707 if reader_process.returncode != 0:
708 self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
709 method_name, reader_process.returncode, stdout, stderr))
710
711 def test_readline(self, universal_newlines=False):
712 """file.readline must handle signals and not lose data."""
713 self._test_reading(
714 data_to_write='hello, world!',
715 read_and_verify_code=(
716 'line = infile.readline() ;'
717 'expected_line = "hello, world!\\n" ;'
718 'assert line == expected_line, ('
719 '"read %r expected %r" % (line, expected_line))'
720 ),
721 method_name='readline',
722 universal_newlines=universal_newlines)
723
724 def test_readline_with_universal_newlines(self):
725 self.test_readline(universal_newlines=True)
726
727 def test_readlines(self, universal_newlines=False):
728 """file.readlines must handle signals and not lose data."""
729 self._test_reading(
730 data_to_write='hello\nworld!',
731 read_and_verify_code=(
732 'lines = infile.readlines() ;'
733 'expected_lines = ["hello\\n", "world!\\n"] ;'
734 'assert lines == expected_lines, ('
735 '"readlines returned wrong data.\\n" '
736 '"got lines %r\\nexpected %r" '
737 '% (lines, expected_lines))'
738 ),
739 method_name='readlines',
740 universal_newlines=universal_newlines)
741
742 def test_readlines_with_universal_newlines(self):
743 self.test_readlines(universal_newlines=True)
744
745 def test_readall(self):
746 """Unbounded file.read() must handle signals and not lose data."""
747 self._test_reading(
748 data_to_write='hello, world!abcdefghijklm',
749 read_and_verify_code=(
750 'data = infile.read() ;'
751 'expected_data = "hello, world!abcdefghijklm\\n";'
752 'assert data == expected_data, ('
753 '"read %r expected %r" % (data, expected_data))'
754 ),
755 method_name='unbounded read')
756
757 def test_readinto(self):
758 """file.readinto must handle signals and not lose data."""
759 self._test_reading(
760 data_to_write='hello, world!',
761 read_and_verify_code=(
762 'data = bytearray(50) ;'
763 'num_read = infile.readinto(data) ;'
764 'expected_data = "hello, world!\\n";'
765 'assert data[:num_read] == expected_data, ('
766 '"read %r expected %r" % (data, expected_data))'
767 ),
768 method_name='readinto')
769
770
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000771class StdoutTests(unittest.TestCase):
772
773 def test_move_stdout_on_write(self):
774 # Issue 3242: sys.stdout can be replaced (and freed) during a
775 # print statement; prevent a segfault in this case
776 save_stdout = sys.stdout
777
778 class File:
779 def write(self, data):
780 if '\n' in data:
781 sys.stdout = save_stdout
782
783 try:
784 sys.stdout = File()
785 print "some text"
786 finally:
787 sys.stdout = save_stdout
788
789 def test_del_stdout_before_print(self):
790 # Issue 4597: 'print' with no argument wasn't reporting when
791 # sys.stdout was deleted.
792 save_stdout = sys.stdout
793 del sys.stdout
794 try:
795 print
796 except RuntimeError as e:
Ezio Melotti2623a372010-11-21 13:34:58 +0000797 self.assertEqual(str(e), "lost sys.stdout")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000798 else:
799 self.fail("Expected RuntimeError")
800 finally:
801 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000802
Victor Stinnercaafd772010-09-08 10:51:01 +0000803 def test_unicode(self):
804 import subprocess
805
806 def get_message(encoding, *code):
807 code = '\n'.join(code)
808 env = os.environ.copy()
809 env['PYTHONIOENCODING'] = encoding
810 process = subprocess.Popen([sys.executable, "-c", code],
811 stdout=subprocess.PIPE, env=env)
812 stdout, stderr = process.communicate()
813 self.assertEqual(process.returncode, 0)
814 return stdout
815
816 def check_message(text, encoding, expected):
817 stdout = get_message(encoding,
818 "import sys",
819 "sys.stdout.write(%r)" % text,
820 "sys.stdout.flush()")
821 self.assertEqual(stdout, expected)
822
Victor Stinner3a68f912010-09-08 11:45:16 +0000823 # test the encoding
824 check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
825 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
826 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
Victor Stinnercaafd772010-09-08 10:51:01 +0000827
Victor Stinner3a68f912010-09-08 11:45:16 +0000828 # test the error handler
829 check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
830 check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
831 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
832
833 # test the buffer API
Victor Stinnercaafd772010-09-08 10:51:01 +0000834 for objtype in ('buffer', 'bytearray'):
835 stdout = get_message('ascii',
836 'import sys',
Victor Stinner3a68f912010-09-08 11:45:16 +0000837 r'sys.stdout.write(%s("\xe9"))' % objtype,
838 'sys.stdout.flush()')
839 self.assertEqual(stdout, "\xe9")
Victor Stinnercaafd772010-09-08 10:51:01 +0000840
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000841
842def test_main():
843 # Historically, these tests have been sloppy about removing TESTFN.
844 # So get rid of it no matter what.
845 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000846 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700847 FileThreadingTests, TestFileSignalEINTR, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000848 finally:
849 if os.path.exists(TESTFN):
850 os.unlink(TESTFN)
851
852if __name__ == '__main__':
853 test_main()