blob: 5b90852b070359fc8a5385ad268947580510ea78 [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -07005import select
6import signal
7import subprocess
Antoine Pitrou47a5f482009-06-12 20:41:52 +00008import time
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00009from array import array
10from weakref import proxy
Victor Stinner6a102812010-04-27 23:55:59 +000011try:
12 import threading
13except ImportError:
14 threading = None
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000015
Antoine Pitrou47a5f482009-06-12 20:41:52 +000016from test import test_support
Georg Brandla4f46e12010-02-07 17:03:15 +000017from test.test_support import TESTFN, run_unittest
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000018from UserList import UserList
19
20class AutoFileTests(unittest.TestCase):
21 # file tests for which a test file is automatically set up
22
23 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000024 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000025
26 def tearDown(self):
27 if self.f:
28 self.f.close()
29 os.remove(TESTFN)
30
31 def testWeakRefs(self):
32 # verify weak references
33 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000034 p.write('teststring')
Ezio Melotti2623a372010-11-21 13:34:58 +000035 self.assertEqual(self.f.tell(), p.tell())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000036 self.f.close()
37 self.f = None
38 self.assertRaises(ReferenceError, getattr, p, 'tell')
39
40 def testAttributes(self):
41 # verify expected attributes exist
42 f = self.f
Ezio Melottid80b4bf2010-03-17 13:52:48 +000043 with test_support.check_py3k_warnings():
44 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000045 f.name # merely shouldn't blow up
46 f.mode # ditto
47 f.closed # ditto
48
Ezio Melottid80b4bf2010-03-17 13:52:48 +000049 with test_support.check_py3k_warnings():
50 # verify softspace is writable
51 f.softspace = softspace # merely shouldn't blow up
Antoine Pitrou47a5f482009-06-12 20:41:52 +000052
53 # verify the others aren't
54 for attr in 'name', 'mode', 'closed':
55 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
56
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000057 def testReadinto(self):
58 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000059 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000060 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000061 a = array('c', 'x'*10)
62 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000063 n = self.f.readinto(a)
Ezio Melotti2623a372010-11-21 13:34:58 +000064 self.assertEqual('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000065
66 def testWritelinesUserList(self):
67 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000068 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000069 self.f.writelines(l)
70 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000071 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000072 buf = self.f.read()
Ezio Melotti2623a372010-11-21 13:34:58 +000073 self.assertEqual(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000074
75 def testWritelinesIntegers(self):
76 # verify writelines with integers
77 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
78
79 def testWritelinesIntegersUserList(self):
80 # verify writelines with integers in UserList
81 l = UserList([1,2,3])
82 self.assertRaises(TypeError, self.f.writelines, l)
83
84 def testWritelinesNonString(self):
85 # verify writelines with non-string object
86 class NonString:
87 pass
88
89 self.assertRaises(TypeError, self.f.writelines,
90 [NonString(), NonString()])
91
Antoine Pitrou47a5f482009-06-12 20:41:52 +000092 def testRepr(self):
93 # verify repr works
Benjamin Peterson5c8da862009-06-30 22:57:08 +000094 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
Ezio Melotti11f8b682012-03-12 01:17:02 +020095 # see issue #14161
96 # Windows doesn't like \r\n\t" in the file name, but ' is ok
97 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
98 with open(fname, 'w') as f:
99 self.addCleanup(os.remove, fname)
100 self.assertTrue(repr(f).startswith(
101 "<open file %r, mode 'w' at" % fname))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000102
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000103 def testErrors(self):
Antoine Pitroubb445a12010-02-05 17:05:54 +0000104 self.f.close()
105 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000106 f = self.f
Ezio Melotti2623a372010-11-21 13:34:58 +0000107 self.assertEqual(f.name, TESTFN)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000108 self.assertTrue(not f.isatty())
109 self.assertTrue(not f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000110
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000111 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000112 f.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000113 self.assertTrue(f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000114
115 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000116 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
117 'readline', 'readlines', 'seek', 'tell', 'truncate',
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000118 'write', '__iter__']
119 deprecated_methods = ['xreadlines']
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000120 if sys.platform.startswith('atheos'):
121 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000122
123 # __exit__ should close the file
124 self.f.__exit__(None, None, None)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000125 self.assertTrue(self.f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000126
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000127 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000128 method = getattr(self.f, methodname)
129 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000130 self.assertRaises(ValueError, method)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000131 with test_support.check_py3k_warnings():
132 for methodname in deprecated_methods:
133 method = getattr(self.f, methodname)
134 self.assertRaises(ValueError, method)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000135 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000136
137 # file is closed, __exit__ shouldn't do anything
Ezio Melotti2623a372010-11-21 13:34:58 +0000138 self.assertEqual(self.f.__exit__(None, None, None), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000139 # it must also return None if an exception was given
140 try:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000141 1 // 0
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000142 except:
Ezio Melotti2623a372010-11-21 13:34:58 +0000143 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000144
145 def testReadWhenWriting(self):
146 self.assertRaises(IOError, self.f.read)
147
Benjamin Petersonbf775542010-10-16 19:20:12 +0000148 def testNastyWritelinesGenerator(self):
149 def nasty():
150 for i in range(5):
151 if i == 3:
152 self.f.close()
153 yield str(i)
154 self.assertRaises(ValueError, self.f.writelines, nasty())
155
Antoine Pitroubb445a12010-02-05 17:05:54 +0000156 def testIssue5677(self):
157 # Remark: Do not perform more than one test per open file,
158 # since that does NOT catch the readline error on Windows.
159 data = 'xxx'
160 for mode in ['w', 'wb', 'a', 'ab']:
161 for attr in ['read', 'readline', 'readlines']:
162 self.f = open(TESTFN, mode)
163 self.f.write(data)
164 self.assertRaises(IOError, getattr(self.f, attr))
165 self.f.close()
166
167 self.f = open(TESTFN, mode)
168 self.f.write(data)
169 self.assertRaises(IOError, lambda: [line for line in self.f])
170 self.f.close()
171
172 self.f = open(TESTFN, mode)
173 self.f.write(data)
174 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
175 self.f.close()
176
177 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
178 self.f = open(TESTFN, mode)
179 self.assertRaises(IOError, self.f.write, data)
180 self.f.close()
181
182 self.f = open(TESTFN, mode)
183 self.assertRaises(IOError, self.f.writelines, [data, data])
184 self.f.close()
185
186 self.f = open(TESTFN, mode)
187 self.assertRaises(IOError, self.f.truncate)
188 self.f.close()
189
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000190class OtherFileTests(unittest.TestCase):
191
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000192 def testOpenDir(self):
Hirokazu Yamamotofa647ec2010-09-23 15:59:21 +0000193 this_dir = os.path.dirname(__file__) or os.curdir
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000194 for mode in (None, "w"):
195 try:
196 if mode:
197 f = open(this_dir, mode)
198 else:
199 f = open(this_dir)
200 except IOError as e:
201 self.assertEqual(e.filename, this_dir)
202 else:
203 self.fail("opening a directory didn't raise an IOError")
204
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000205 def testModeStrings(self):
206 # check invalid mode strings
207 for mode in ("", "aU", "wU+"):
208 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000209 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000210 except ValueError:
211 pass
212 else:
213 f.close()
214 self.fail('%r is an invalid file mode' % mode)
215
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000216 # Some invalid modes fail on Windows, but pass on Unix
217 # Issue3965: avoid a crash on Windows when filename is unicode
218 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
219 try:
220 f = open(name, "rr")
221 except (IOError, ValueError):
222 pass
223 else:
224 f.close()
225
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000226 def testStdin(self):
227 # This causes the interpreter to exit on OSF1 v5.1.
228 if sys.platform != 'osf1V5':
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000229 self.assertRaises(IOError, sys.stdin.seek, -1)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000230 else:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000231 print >>sys.__stdout__, (
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000232 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000233 ' Test manually.')
234 self.assertRaises(IOError, sys.stdin.truncate)
235
236 def testUnicodeOpen(self):
237 # verify repr works for unicode too
238 f = open(unicode(TESTFN), "w")
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000239 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000240 f.close()
241 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000242
243 def testBadModeArgument(self):
244 # verify that we get a sensible error message for bad mode argument
245 bad_mode = "qwerty"
246 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000247 f = open(TESTFN, bad_mode)
248 except ValueError, msg:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000249 if msg.args[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000250 s = str(msg)
Ezio Melotti187f93d2010-03-17 14:22:34 +0000251 if TESTFN in s or bad_mode not in s:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000252 self.fail("bad error message for invalid mode: %s" % s)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000253 # if msg.args[0] == 0, we're probably on Windows where there may
254 # be no obvious way to discover why open() failed.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000255 else:
256 f.close()
257 self.fail("no error for invalid mode: %s" % bad_mode)
258
259 def testSetBufferSize(self):
260 # make sure that explicitly setting the buffer size doesn't cause
261 # misbehaviour especially with repeated close() calls
262 for s in (-1, 0, 1, 512):
263 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000264 f = open(TESTFN, 'w', s)
265 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000266 f.close()
267 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000268 f = open(TESTFN, 'r', s)
269 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000270 f.close()
271 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000272 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000273 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
Ezio Melotti2623a372010-11-21 13:34:58 +0000274 self.assertEqual(d, s)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000275
276 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000277 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000278
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000279 def bug801631():
280 # SF bug <http://www.python.org/sf/801631>
281 # "file.truncate fault on windows"
282 f = open(TESTFN, 'wb')
283 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000284 f.close()
285
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000286 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000287 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000288 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000289 self.fail("Read on file opened for update failed %r" % data)
290 if f.tell() != 5:
291 self.fail("File pos after read wrong %d" % f.tell())
292
293 f.truncate()
294 if f.tell() != 5:
295 self.fail("File pos after ftruncate wrong %d" % f.tell())
296
297 f.close()
298 size = os.path.getsize(TESTFN)
299 if size != 5:
300 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000301
302 try:
303 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000304 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000305 os.unlink(TESTFN)
306
307 def testIteration(self):
308 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000309 # various read* methods. Ostensibly, the mixture could just be tested
310 # to work when it should work according to the Python language,
311 # instead of fail when it should fail according to the current CPython
312 # implementation. People don't always program Python the way they
313 # should, though, and the implemenation might change in subtle ways,
314 # so we explicitly test for errors, too; the test will just have to
315 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000316 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000317 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000318 assert not dataoffset % len(filler), \
319 "dataoffset must be multiple of len(filler)"
320 nchunks = dataoffset // len(filler)
321 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000322 "spam, spam and eggs\n",
323 "eggs, spam, ham and spam\n",
324 "saussages, spam, spam and eggs\n",
325 "spam, ham, spam and eggs\n",
326 "spam, spam, spam, spam, spam, ham, spam\n",
327 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000328 ]
329 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000330 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000331
332 try:
333 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000334 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000335 bag.write(filler * nchunks)
336 bag.writelines(testlines)
337 bag.close()
338 # Test for appropriate errors mixing read* and iteration
339 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000340 f = open(TESTFN)
341 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000342 self.fail, "Broken testfile"
343 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000344 try:
345 meth(*args)
346 except ValueError:
347 pass
348 else:
349 self.fail("%s%r after next() didn't raise ValueError" %
350 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000351 f.close()
352
353 # Test to see if harmless (by accident) mixing of read* and
354 # iteration still works. This depends on the size of the internal
355 # iteration buffer (currently 8192,) but we can test it in a
356 # flexible manner. Each line in the bag o' ham is 4 bytes
357 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
358 # exactly on the buffer boundary for any power-of-2 buffersize
359 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000360 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000361 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000362 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000363 testline = testlines.pop(0)
364 try:
365 line = f.readline()
366 except ValueError:
367 self.fail("readline() after next() with supposedly empty "
368 "iteration-buffer failed anyway")
369 if line != testline:
370 self.fail("readline() after next() with empty buffer "
371 "failed. Got %r, expected %r" % (line, testline))
372 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000373 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000374 try:
375 f.readinto(buf)
376 except ValueError:
377 self.fail("readinto() after next() with supposedly empty "
378 "iteration-buffer failed anyway")
379 line = buf.tostring()
380 if line != testline:
381 self.fail("readinto() after next() with empty buffer "
382 "failed. Got %r, expected %r" % (line, testline))
383
384 testline = testlines.pop(0)
385 try:
386 line = f.read(len(testline))
387 except ValueError:
388 self.fail("read() after next() with supposedly empty "
389 "iteration-buffer failed anyway")
390 if line != testline:
391 self.fail("read() after next() with empty buffer "
392 "failed. Got %r, expected %r" % (line, testline))
393 try:
394 lines = f.readlines()
395 except ValueError:
396 self.fail("readlines() after next() with supposedly empty "
397 "iteration-buffer failed anyway")
398 if lines != testlines:
399 self.fail("readlines() after next() with empty buffer "
400 "failed. Got %r, expected %r" % (line, testline))
401 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000402 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000403 try:
404 for line in f:
405 pass
406 try:
407 f.readline()
408 f.readinto(buf)
409 f.read()
410 f.readlines()
411 except ValueError:
412 self.fail("read* failed after next() consumed file")
413 finally:
414 f.close()
415 finally:
416 os.unlink(TESTFN)
417
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200418 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
419 def test_write_full(self):
420 # Issue #17976
421 with open('/dev/full', 'w', 1) as f:
422 with self.assertRaises(IOError):
423 f.write('hello')
424 f.write('\n')
425
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000426class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000427
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000428 def testExit(self):
429 # test that exiting with context calls subclass' close
430 class C(file):
431 def __init__(self, *args):
432 self.subclass_closed = False
433 file.__init__(self, *args)
434 def close(self):
435 self.subclass_closed = True
436 file.close(self)
437
438 with C(TESTFN, 'w') as f:
439 pass
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000440 self.assertTrue(f.subclass_closed)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000441
442
Victor Stinner6a102812010-04-27 23:55:59 +0000443@unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000444class FileThreadingTests(unittest.TestCase):
445 # These tests check the ability to call various methods of file objects
446 # (including close()) concurrently without crashing the Python interpreter.
447 # See #815646, #595601
448
449 def setUp(self):
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000450 self._threads = test_support.threading_setup()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000451 self.f = None
452 self.filename = TESTFN
453 with open(self.filename, "w") as f:
454 f.write("\n".join("0123456789"))
455 self._count_lock = threading.Lock()
456 self.close_count = 0
457 self.close_success_count = 0
Antoine Pitrou83137c22010-05-17 19:56:59 +0000458 self.use_buffering = False
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000459
460 def tearDown(self):
461 if self.f:
462 try:
463 self.f.close()
464 except (EnvironmentError, ValueError):
465 pass
466 try:
467 os.remove(self.filename)
468 except EnvironmentError:
469 pass
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000470 test_support.threading_cleanup(*self._threads)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000471
472 def _create_file(self):
Antoine Pitrou83137c22010-05-17 19:56:59 +0000473 if self.use_buffering:
474 self.f = open(self.filename, "w+", buffering=1024*16)
475 else:
476 self.f = open(self.filename, "w+")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000477
478 def _close_file(self):
479 with self._count_lock:
480 self.close_count += 1
481 self.f.close()
482 with self._count_lock:
483 self.close_success_count += 1
484
485 def _close_and_reopen_file(self):
486 self._close_file()
487 # if close raises an exception thats fine, self.f remains valid so
488 # we don't need to reopen.
489 self._create_file()
490
491 def _run_workers(self, func, nb_workers, duration=0.2):
492 with self._count_lock:
493 self.close_count = 0
494 self.close_success_count = 0
495 self.do_continue = True
496 threads = []
497 try:
498 for i in range(nb_workers):
499 t = threading.Thread(target=func)
500 t.start()
501 threads.append(t)
502 for _ in xrange(100):
503 time.sleep(duration/100)
504 with self._count_lock:
505 if self.close_count-self.close_success_count > nb_workers+1:
506 if test_support.verbose:
507 print 'Q',
508 break
509 time.sleep(duration)
510 finally:
511 self.do_continue = False
512 for t in threads:
513 t.join()
514
515 def _test_close_open_io(self, io_func, nb_workers=5):
516 def worker():
517 self._create_file()
518 funcs = itertools.cycle((
519 lambda: io_func(),
520 lambda: self._close_and_reopen_file(),
521 ))
522 for f in funcs:
523 if not self.do_continue:
524 break
525 try:
526 f()
527 except (IOError, ValueError):
528 pass
529 self._run_workers(worker, nb_workers)
530 if test_support.verbose:
531 # Useful verbose statistics when tuning this test to take
532 # less time to run but still ensuring that its still useful.
533 #
534 # the percent of close calls that raised an error
535 percent = 100. - 100.*self.close_success_count/self.close_count
536 print self.close_count, ('%.4f ' % percent),
537
538 def test_close_open(self):
539 def io_func():
540 pass
541 self._test_close_open_io(io_func)
542
543 def test_close_open_flush(self):
544 def io_func():
545 self.f.flush()
546 self._test_close_open_io(io_func)
547
548 def test_close_open_iter(self):
549 def io_func():
550 list(iter(self.f))
551 self._test_close_open_io(io_func)
552
553 def test_close_open_isatty(self):
554 def io_func():
555 self.f.isatty()
556 self._test_close_open_io(io_func)
557
558 def test_close_open_print(self):
559 def io_func():
560 print >> self.f, ''
561 self._test_close_open_io(io_func)
562
Antoine Pitrou83137c22010-05-17 19:56:59 +0000563 def test_close_open_print_buffered(self):
564 self.use_buffering = True
565 def io_func():
566 print >> self.f, ''
567 self._test_close_open_io(io_func)
568
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000569 def test_close_open_read(self):
570 def io_func():
571 self.f.read(0)
572 self._test_close_open_io(io_func)
573
574 def test_close_open_readinto(self):
575 def io_func():
576 a = array('c', 'xxxxx')
577 self.f.readinto(a)
578 self._test_close_open_io(io_func)
579
580 def test_close_open_readline(self):
581 def io_func():
582 self.f.readline()
583 self._test_close_open_io(io_func)
584
585 def test_close_open_readlines(self):
586 def io_func():
587 self.f.readlines()
588 self._test_close_open_io(io_func)
589
590 def test_close_open_seek(self):
591 def io_func():
592 self.f.seek(0, 0)
593 self._test_close_open_io(io_func)
594
595 def test_close_open_tell(self):
596 def io_func():
597 self.f.tell()
598 self._test_close_open_io(io_func)
599
600 def test_close_open_truncate(self):
601 def io_func():
602 self.f.truncate()
603 self._test_close_open_io(io_func)
604
605 def test_close_open_write(self):
606 def io_func():
607 self.f.write('')
608 self._test_close_open_io(io_func)
609
610 def test_close_open_writelines(self):
611 def io_func():
612 self.f.writelines('')
613 self._test_close_open_io(io_func)
614
615
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700616@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
617class TestFileSignalEINTR(unittest.TestCase):
618 def _test_reading(self, data_to_write, read_and_verify_code, method_name,
619 universal_newlines=False):
620 """Generic buffered read method test harness to verify EINTR behavior.
621
622 Also validates that Python signal handlers are run during the read.
623
624 Args:
625 data_to_write: String to write to the child process for reading
626 before sending it a signal, confirming the signal was handled,
627 writing a final newline char and closing the infile pipe.
628 read_and_verify_code: Single "line" of code to read from a file
629 object named 'infile' and validate the result. This will be
630 executed as part of a python subprocess fed data_to_write.
631 method_name: The name of the read method being tested, for use in
632 an error message on failure.
633 universal_newlines: If True, infile will be opened in universal
634 newline mode in the child process.
635 """
636 if universal_newlines:
637 # Test the \r\n -> \n conversion while we're at it.
638 data_to_write = data_to_write.replace('\n', '\r\n')
639 infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
640 else:
641 infile_setup_code = 'infile = sys.stdin'
642 # Total pipe IO in this function is smaller than the minimum posix OS
643 # pipe buffer size of 512 bytes. No writer should block.
644 assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
645
646 child_code = (
647 'import os, signal, sys ;'
648 'signal.signal('
649 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
650 + infile_setup_code + ' ;' +
651 'assert isinstance(infile, file) ;'
652 'sys.stderr.write("Go.\\n") ;'
653 + read_and_verify_code)
654 reader_process = subprocess.Popen(
655 [sys.executable, '-c', child_code],
656 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
657 stderr=subprocess.PIPE)
658 # Wait for the signal handler to be installed.
659 go = reader_process.stderr.read(4)
660 if go != 'Go.\n':
661 reader_process.kill()
662 self.fail('Error from %s process while awaiting "Go":\n%s' % (
663 method_name, go+reader_process.stderr.read()))
664 reader_process.stdin.write(data_to_write)
665 signals_sent = 0
666 rlist = []
667 # We don't know when the read_and_verify_code in our child is actually
668 # executing within the read system call we want to interrupt. This
669 # loop waits for a bit before sending the first signal to increase
670 # the likelihood of that. Implementations without correct EINTR
671 # and signal handling usually fail this test.
672 while not rlist:
673 rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
674 reader_process.send_signal(signal.SIGINT)
675 # Give the subprocess time to handle it before we loop around and
676 # send another one. On OSX the second signal happening close to
677 # immediately after the first was causing the subprocess to crash
678 # via the OS's default SIGINT handler.
679 time.sleep(0.1)
680 signals_sent += 1
681 if signals_sent > 200:
682 reader_process.kill()
683 self.fail("failed to handle signal during %s." % method_name)
684 # This assumes anything unexpected that writes to stderr will also
685 # write a newline. That is true of the traceback printing code.
686 signal_line = reader_process.stderr.readline()
687 if signal_line != '$\n':
688 reader_process.kill()
689 self.fail('Error from %s process while awaiting signal:\n%s' % (
690 method_name, signal_line+reader_process.stderr.read()))
691 # We append a newline to our input so that a readline call can
692 # end on its own before the EOF is seen.
693 stdout, stderr = reader_process.communicate(input='\n')
694 if reader_process.returncode != 0:
695 self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
696 method_name, reader_process.returncode, stdout, stderr))
697
698 def test_readline(self, universal_newlines=False):
699 """file.readline must handle signals and not lose data."""
700 self._test_reading(
701 data_to_write='hello, world!',
702 read_and_verify_code=(
703 'line = infile.readline() ;'
704 'expected_line = "hello, world!\\n" ;'
705 'assert line == expected_line, ('
706 '"read %r expected %r" % (line, expected_line))'
707 ),
708 method_name='readline',
709 universal_newlines=universal_newlines)
710
711 def test_readline_with_universal_newlines(self):
712 self.test_readline(universal_newlines=True)
713
714 def test_readlines(self, universal_newlines=False):
715 """file.readlines must handle signals and not lose data."""
716 self._test_reading(
717 data_to_write='hello\nworld!',
718 read_and_verify_code=(
719 'lines = infile.readlines() ;'
720 'expected_lines = ["hello\\n", "world!\\n"] ;'
721 'assert lines == expected_lines, ('
722 '"readlines returned wrong data.\\n" '
723 '"got lines %r\\nexpected %r" '
724 '% (lines, expected_lines))'
725 ),
726 method_name='readlines',
727 universal_newlines=universal_newlines)
728
729 def test_readlines_with_universal_newlines(self):
730 self.test_readlines(universal_newlines=True)
731
732 def test_readall(self):
733 """Unbounded file.read() must handle signals and not lose data."""
734 self._test_reading(
735 data_to_write='hello, world!abcdefghijklm',
736 read_and_verify_code=(
737 'data = infile.read() ;'
738 'expected_data = "hello, world!abcdefghijklm\\n";'
739 'assert data == expected_data, ('
740 '"read %r expected %r" % (data, expected_data))'
741 ),
742 method_name='unbounded read')
743
744 def test_readinto(self):
745 """file.readinto must handle signals and not lose data."""
746 self._test_reading(
747 data_to_write='hello, world!',
748 read_and_verify_code=(
749 'data = bytearray(50) ;'
750 'num_read = infile.readinto(data) ;'
751 'expected_data = "hello, world!\\n";'
752 'assert data[:num_read] == expected_data, ('
753 '"read %r expected %r" % (data, expected_data))'
754 ),
755 method_name='readinto')
756
757
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000758class StdoutTests(unittest.TestCase):
759
760 def test_move_stdout_on_write(self):
761 # Issue 3242: sys.stdout can be replaced (and freed) during a
762 # print statement; prevent a segfault in this case
763 save_stdout = sys.stdout
764
765 class File:
766 def write(self, data):
767 if '\n' in data:
768 sys.stdout = save_stdout
769
770 try:
771 sys.stdout = File()
772 print "some text"
773 finally:
774 sys.stdout = save_stdout
775
776 def test_del_stdout_before_print(self):
777 # Issue 4597: 'print' with no argument wasn't reporting when
778 # sys.stdout was deleted.
779 save_stdout = sys.stdout
780 del sys.stdout
781 try:
782 print
783 except RuntimeError as e:
Ezio Melotti2623a372010-11-21 13:34:58 +0000784 self.assertEqual(str(e), "lost sys.stdout")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000785 else:
786 self.fail("Expected RuntimeError")
787 finally:
788 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000789
Victor Stinnercaafd772010-09-08 10:51:01 +0000790 def test_unicode(self):
791 import subprocess
792
793 def get_message(encoding, *code):
794 code = '\n'.join(code)
795 env = os.environ.copy()
796 env['PYTHONIOENCODING'] = encoding
797 process = subprocess.Popen([sys.executable, "-c", code],
798 stdout=subprocess.PIPE, env=env)
799 stdout, stderr = process.communicate()
800 self.assertEqual(process.returncode, 0)
801 return stdout
802
803 def check_message(text, encoding, expected):
804 stdout = get_message(encoding,
805 "import sys",
806 "sys.stdout.write(%r)" % text,
807 "sys.stdout.flush()")
808 self.assertEqual(stdout, expected)
809
Victor Stinner3a68f912010-09-08 11:45:16 +0000810 # test the encoding
811 check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
812 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
813 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
Victor Stinnercaafd772010-09-08 10:51:01 +0000814
Victor Stinner3a68f912010-09-08 11:45:16 +0000815 # test the error handler
816 check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
817 check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
818 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
819
820 # test the buffer API
Victor Stinnercaafd772010-09-08 10:51:01 +0000821 for objtype in ('buffer', 'bytearray'):
822 stdout = get_message('ascii',
823 'import sys',
Victor Stinner3a68f912010-09-08 11:45:16 +0000824 r'sys.stdout.write(%s("\xe9"))' % objtype,
825 'sys.stdout.flush()')
826 self.assertEqual(stdout, "\xe9")
Victor Stinnercaafd772010-09-08 10:51:01 +0000827
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000828
829def test_main():
830 # Historically, these tests have been sloppy about removing TESTFN.
831 # So get rid of it no matter what.
832 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000833 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700834 FileThreadingTests, TestFileSignalEINTR, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000835 finally:
836 if os.path.exists(TESTFN):
837 os.unlink(TESTFN)
838
839if __name__ == '__main__':
840 test_main()