blob: cab4079c3eb8c24b5d487e78e39df40338e945b6 [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -07005import select
6import signal
7import subprocess
Antoine Pitrou47a5f482009-06-12 20:41:52 +00008import time
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00009from array import array
10from weakref import proxy
Victor Stinner6a102812010-04-27 23:55:59 +000011try:
12 import threading
13except ImportError:
14 threading = None
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000015
Antoine Pitrou47a5f482009-06-12 20:41:52 +000016from test import test_support
Georg Brandla4f46e12010-02-07 17:03:15 +000017from test.test_support import TESTFN, run_unittest
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000018from UserList import UserList
19
20class AutoFileTests(unittest.TestCase):
21 # file tests for which a test file is automatically set up
22
23 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000024 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000025
26 def tearDown(self):
27 if self.f:
28 self.f.close()
29 os.remove(TESTFN)
30
31 def testWeakRefs(self):
32 # verify weak references
33 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000034 p.write('teststring')
Ezio Melotti2623a372010-11-21 13:34:58 +000035 self.assertEqual(self.f.tell(), p.tell())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000036 self.f.close()
37 self.f = None
38 self.assertRaises(ReferenceError, getattr, p, 'tell')
39
40 def testAttributes(self):
41 # verify expected attributes exist
42 f = self.f
Ezio Melottid80b4bf2010-03-17 13:52:48 +000043 with test_support.check_py3k_warnings():
44 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000045 f.name # merely shouldn't blow up
46 f.mode # ditto
47 f.closed # ditto
48
Ezio Melottid80b4bf2010-03-17 13:52:48 +000049 with test_support.check_py3k_warnings():
50 # verify softspace is writable
51 f.softspace = softspace # merely shouldn't blow up
Antoine Pitrou47a5f482009-06-12 20:41:52 +000052
53 # verify the others aren't
54 for attr in 'name', 'mode', 'closed':
55 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
56
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000057 def testReadinto(self):
58 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000059 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000060 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000061 a = array('c', 'x'*10)
62 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000063 n = self.f.readinto(a)
Ezio Melotti2623a372010-11-21 13:34:58 +000064 self.assertEqual('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000065
66 def testWritelinesUserList(self):
67 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000068 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000069 self.f.writelines(l)
70 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000071 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000072 buf = self.f.read()
Ezio Melotti2623a372010-11-21 13:34:58 +000073 self.assertEqual(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000074
75 def testWritelinesIntegers(self):
76 # verify writelines with integers
77 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
78
79 def testWritelinesIntegersUserList(self):
80 # verify writelines with integers in UserList
81 l = UserList([1,2,3])
82 self.assertRaises(TypeError, self.f.writelines, l)
83
84 def testWritelinesNonString(self):
85 # verify writelines with non-string object
86 class NonString:
87 pass
88
89 self.assertRaises(TypeError, self.f.writelines,
90 [NonString(), NonString()])
91
Antoine Pitroub0acc1b2014-05-08 19:26:04 +020092 def testWritelinesBuffer(self):
93 self.f.writelines([array('c', 'abc')])
94 self.f.close()
95 self.f = open(TESTFN, 'rb')
96 buf = self.f.read()
97 self.assertEqual(buf, 'abc')
98
Antoine Pitrou47a5f482009-06-12 20:41:52 +000099 def testRepr(self):
100 # verify repr works
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000101 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
Ezio Melotti11f8b682012-03-12 01:17:02 +0200102 # see issue #14161
103 # Windows doesn't like \r\n\t" in the file name, but ' is ok
104 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
105 with open(fname, 'w') as f:
106 self.addCleanup(os.remove, fname)
107 self.assertTrue(repr(f).startswith(
108 "<open file %r, mode 'w' at" % fname))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000109
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000110 def testErrors(self):
Antoine Pitroubb445a12010-02-05 17:05:54 +0000111 self.f.close()
112 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000113 f = self.f
Ezio Melotti2623a372010-11-21 13:34:58 +0000114 self.assertEqual(f.name, TESTFN)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000115 self.assertTrue(not f.isatty())
116 self.assertTrue(not f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000117
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000118 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000119 f.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000120 self.assertTrue(f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000121
122 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000123 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
124 'readline', 'readlines', 'seek', 'tell', 'truncate',
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000125 'write', '__iter__']
126 deprecated_methods = ['xreadlines']
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000127 if sys.platform.startswith('atheos'):
128 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000129
130 # __exit__ should close the file
131 self.f.__exit__(None, None, None)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000132 self.assertTrue(self.f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000133
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000134 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000135 method = getattr(self.f, methodname)
136 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000137 self.assertRaises(ValueError, method)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000138 with test_support.check_py3k_warnings():
139 for methodname in deprecated_methods:
140 method = getattr(self.f, methodname)
141 self.assertRaises(ValueError, method)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000142 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000143
144 # file is closed, __exit__ shouldn't do anything
Ezio Melotti2623a372010-11-21 13:34:58 +0000145 self.assertEqual(self.f.__exit__(None, None, None), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000146 # it must also return None if an exception was given
147 try:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000148 1 // 0
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000149 except:
Ezio Melotti2623a372010-11-21 13:34:58 +0000150 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000151
152 def testReadWhenWriting(self):
153 self.assertRaises(IOError, self.f.read)
154
Benjamin Petersonbf775542010-10-16 19:20:12 +0000155 def testNastyWritelinesGenerator(self):
156 def nasty():
157 for i in range(5):
158 if i == 3:
159 self.f.close()
160 yield str(i)
161 self.assertRaises(ValueError, self.f.writelines, nasty())
162
Antoine Pitroubb445a12010-02-05 17:05:54 +0000163 def testIssue5677(self):
164 # Remark: Do not perform more than one test per open file,
165 # since that does NOT catch the readline error on Windows.
166 data = 'xxx'
167 for mode in ['w', 'wb', 'a', 'ab']:
168 for attr in ['read', 'readline', 'readlines']:
169 self.f = open(TESTFN, mode)
170 self.f.write(data)
171 self.assertRaises(IOError, getattr(self.f, attr))
172 self.f.close()
173
174 self.f = open(TESTFN, mode)
175 self.f.write(data)
176 self.assertRaises(IOError, lambda: [line for line in self.f])
177 self.f.close()
178
179 self.f = open(TESTFN, mode)
180 self.f.write(data)
181 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
182 self.f.close()
183
184 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
185 self.f = open(TESTFN, mode)
186 self.assertRaises(IOError, self.f.write, data)
187 self.f.close()
188
189 self.f = open(TESTFN, mode)
190 self.assertRaises(IOError, self.f.writelines, [data, data])
191 self.f.close()
192
193 self.f = open(TESTFN, mode)
194 self.assertRaises(IOError, self.f.truncate)
195 self.f.close()
196
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000197class OtherFileTests(unittest.TestCase):
198
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000199 def testOpenDir(self):
Hirokazu Yamamotofa647ec2010-09-23 15:59:21 +0000200 this_dir = os.path.dirname(__file__) or os.curdir
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000201 for mode in (None, "w"):
202 try:
203 if mode:
204 f = open(this_dir, mode)
205 else:
206 f = open(this_dir)
207 except IOError as e:
208 self.assertEqual(e.filename, this_dir)
209 else:
210 self.fail("opening a directory didn't raise an IOError")
211
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000212 def testModeStrings(self):
213 # check invalid mode strings
214 for mode in ("", "aU", "wU+"):
215 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000216 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000217 except ValueError:
218 pass
219 else:
220 f.close()
221 self.fail('%r is an invalid file mode' % mode)
222
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000223 # Some invalid modes fail on Windows, but pass on Unix
224 # Issue3965: avoid a crash on Windows when filename is unicode
225 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
226 try:
227 f = open(name, "rr")
228 except (IOError, ValueError):
229 pass
230 else:
231 f.close()
232
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000233 def testStdin(self):
234 # This causes the interpreter to exit on OSF1 v5.1.
235 if sys.platform != 'osf1V5':
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000236 self.assertRaises(IOError, sys.stdin.seek, -1)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000237 else:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000238 print >>sys.__stdout__, (
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000239 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000240 ' Test manually.')
241 self.assertRaises(IOError, sys.stdin.truncate)
242
243 def testUnicodeOpen(self):
244 # verify repr works for unicode too
245 f = open(unicode(TESTFN), "w")
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000246 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000247 f.close()
248 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000249
250 def testBadModeArgument(self):
251 # verify that we get a sensible error message for bad mode argument
252 bad_mode = "qwerty"
253 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000254 f = open(TESTFN, bad_mode)
255 except ValueError, msg:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000256 if msg.args[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000257 s = str(msg)
Ezio Melotti187f93d2010-03-17 14:22:34 +0000258 if TESTFN in s or bad_mode not in s:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000259 self.fail("bad error message for invalid mode: %s" % s)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000260 # if msg.args[0] == 0, we're probably on Windows where there may
261 # be no obvious way to discover why open() failed.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000262 else:
263 f.close()
264 self.fail("no error for invalid mode: %s" % bad_mode)
265
266 def testSetBufferSize(self):
267 # make sure that explicitly setting the buffer size doesn't cause
268 # misbehaviour especially with repeated close() calls
269 for s in (-1, 0, 1, 512):
270 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000271 f = open(TESTFN, 'w', s)
272 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000273 f.close()
274 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000275 f = open(TESTFN, 'r', s)
276 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000277 f.close()
278 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000279 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000280 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
Ezio Melotti2623a372010-11-21 13:34:58 +0000281 self.assertEqual(d, s)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000282
283 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000284 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000285
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000286 def bug801631():
287 # SF bug <http://www.python.org/sf/801631>
288 # "file.truncate fault on windows"
289 f = open(TESTFN, 'wb')
290 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000291 f.close()
292
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000293 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000294 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000295 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000296 self.fail("Read on file opened for update failed %r" % data)
297 if f.tell() != 5:
298 self.fail("File pos after read wrong %d" % f.tell())
299
300 f.truncate()
301 if f.tell() != 5:
302 self.fail("File pos after ftruncate wrong %d" % f.tell())
303
304 f.close()
305 size = os.path.getsize(TESTFN)
306 if size != 5:
307 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000308
309 try:
310 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000311 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000312 os.unlink(TESTFN)
313
314 def testIteration(self):
315 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000316 # various read* methods. Ostensibly, the mixture could just be tested
317 # to work when it should work according to the Python language,
318 # instead of fail when it should fail according to the current CPython
319 # implementation. People don't always program Python the way they
320 # should, though, and the implemenation might change in subtle ways,
321 # so we explicitly test for errors, too; the test will just have to
322 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000323 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000324 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000325 assert not dataoffset % len(filler), \
326 "dataoffset must be multiple of len(filler)"
327 nchunks = dataoffset // len(filler)
328 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000329 "spam, spam and eggs\n",
330 "eggs, spam, ham and spam\n",
331 "saussages, spam, spam and eggs\n",
332 "spam, ham, spam and eggs\n",
333 "spam, spam, spam, spam, spam, ham, spam\n",
334 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000335 ]
336 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000337 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000338
339 try:
340 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000341 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000342 bag.write(filler * nchunks)
343 bag.writelines(testlines)
344 bag.close()
345 # Test for appropriate errors mixing read* and iteration
346 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000347 f = open(TESTFN)
348 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000349 self.fail, "Broken testfile"
350 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000351 try:
352 meth(*args)
353 except ValueError:
354 pass
355 else:
356 self.fail("%s%r after next() didn't raise ValueError" %
357 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000358 f.close()
359
360 # Test to see if harmless (by accident) mixing of read* and
361 # iteration still works. This depends on the size of the internal
362 # iteration buffer (currently 8192,) but we can test it in a
363 # flexible manner. Each line in the bag o' ham is 4 bytes
364 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
365 # exactly on the buffer boundary for any power-of-2 buffersize
366 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000367 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000368 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000369 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000370 testline = testlines.pop(0)
371 try:
372 line = f.readline()
373 except ValueError:
374 self.fail("readline() after next() with supposedly empty "
375 "iteration-buffer failed anyway")
376 if line != testline:
377 self.fail("readline() after next() with empty buffer "
378 "failed. Got %r, expected %r" % (line, testline))
379 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000380 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000381 try:
382 f.readinto(buf)
383 except ValueError:
384 self.fail("readinto() after next() with supposedly empty "
385 "iteration-buffer failed anyway")
386 line = buf.tostring()
387 if line != testline:
388 self.fail("readinto() after next() with empty buffer "
389 "failed. Got %r, expected %r" % (line, testline))
390
391 testline = testlines.pop(0)
392 try:
393 line = f.read(len(testline))
394 except ValueError:
395 self.fail("read() after next() with supposedly empty "
396 "iteration-buffer failed anyway")
397 if line != testline:
398 self.fail("read() after next() with empty buffer "
399 "failed. Got %r, expected %r" % (line, testline))
400 try:
401 lines = f.readlines()
402 except ValueError:
403 self.fail("readlines() after next() with supposedly empty "
404 "iteration-buffer failed anyway")
405 if lines != testlines:
406 self.fail("readlines() after next() with empty buffer "
407 "failed. Got %r, expected %r" % (line, testline))
408 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000409 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000410 try:
411 for line in f:
412 pass
413 try:
414 f.readline()
415 f.readinto(buf)
416 f.read()
417 f.readlines()
418 except ValueError:
419 self.fail("read* failed after next() consumed file")
420 finally:
421 f.close()
422 finally:
423 os.unlink(TESTFN)
424
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200425 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
426 def test_write_full(self):
427 # Issue #17976
Serhiy Storchaka84e7e5f2013-12-17 14:53:32 +0200428 try:
429 f = open('/dev/full', 'w', 1)
430 except IOError:
431 self.skipTest("requires '/dev/full'")
432 try:
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200433 with self.assertRaises(IOError):
434 f.write('hello')
435 f.write('\n')
Serhiy Storchaka84e7e5f2013-12-17 14:53:32 +0200436 finally:
437 f.close()
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200438
Benjamin Petersonbc4a8342014-09-30 21:28:27 -0400439 @unittest.skipUnless(sys.maxsize > 2**31, "requires 64-bit system")
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400440 @test_support.precisionbigmemtest(2**31, 1)
441 def test_very_long_line(self, maxsize):
442 # Issue #22526
443 with open(TESTFN, "wb") as fp:
444 fp.write("\0"*2**31)
445 with open(TESTFN, "rb") as fp:
446 for l in fp:
447 pass
448 self.assertEqual(len(l), 2**31)
449 self.assertEqual(l.count("\0"), 2**31)
450 l = None
451
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000452class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000453
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000454 def testExit(self):
455 # test that exiting with context calls subclass' close
456 class C(file):
457 def __init__(self, *args):
458 self.subclass_closed = False
459 file.__init__(self, *args)
460 def close(self):
461 self.subclass_closed = True
462 file.close(self)
463
464 with C(TESTFN, 'w') as f:
465 pass
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000466 self.assertTrue(f.subclass_closed)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000467
468
Victor Stinner6a102812010-04-27 23:55:59 +0000469@unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000470class FileThreadingTests(unittest.TestCase):
471 # These tests check the ability to call various methods of file objects
472 # (including close()) concurrently without crashing the Python interpreter.
473 # See #815646, #595601
474
475 def setUp(self):
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000476 self._threads = test_support.threading_setup()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000477 self.f = None
478 self.filename = TESTFN
479 with open(self.filename, "w") as f:
480 f.write("\n".join("0123456789"))
481 self._count_lock = threading.Lock()
482 self.close_count = 0
483 self.close_success_count = 0
Antoine Pitrou83137c22010-05-17 19:56:59 +0000484 self.use_buffering = False
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000485
486 def tearDown(self):
487 if self.f:
488 try:
489 self.f.close()
490 except (EnvironmentError, ValueError):
491 pass
492 try:
493 os.remove(self.filename)
494 except EnvironmentError:
495 pass
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000496 test_support.threading_cleanup(*self._threads)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000497
498 def _create_file(self):
Antoine Pitrou83137c22010-05-17 19:56:59 +0000499 if self.use_buffering:
500 self.f = open(self.filename, "w+", buffering=1024*16)
501 else:
502 self.f = open(self.filename, "w+")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000503
504 def _close_file(self):
505 with self._count_lock:
506 self.close_count += 1
507 self.f.close()
508 with self._count_lock:
509 self.close_success_count += 1
510
511 def _close_and_reopen_file(self):
512 self._close_file()
513 # if close raises an exception thats fine, self.f remains valid so
514 # we don't need to reopen.
515 self._create_file()
516
517 def _run_workers(self, func, nb_workers, duration=0.2):
518 with self._count_lock:
519 self.close_count = 0
520 self.close_success_count = 0
521 self.do_continue = True
522 threads = []
523 try:
524 for i in range(nb_workers):
525 t = threading.Thread(target=func)
526 t.start()
527 threads.append(t)
528 for _ in xrange(100):
529 time.sleep(duration/100)
530 with self._count_lock:
531 if self.close_count-self.close_success_count > nb_workers+1:
532 if test_support.verbose:
533 print 'Q',
534 break
535 time.sleep(duration)
536 finally:
537 self.do_continue = False
538 for t in threads:
539 t.join()
540
541 def _test_close_open_io(self, io_func, nb_workers=5):
542 def worker():
543 self._create_file()
544 funcs = itertools.cycle((
545 lambda: io_func(),
546 lambda: self._close_and_reopen_file(),
547 ))
548 for f in funcs:
549 if not self.do_continue:
550 break
551 try:
552 f()
553 except (IOError, ValueError):
554 pass
555 self._run_workers(worker, nb_workers)
556 if test_support.verbose:
557 # Useful verbose statistics when tuning this test to take
558 # less time to run but still ensuring that its still useful.
559 #
560 # the percent of close calls that raised an error
561 percent = 100. - 100.*self.close_success_count/self.close_count
562 print self.close_count, ('%.4f ' % percent),
563
564 def test_close_open(self):
565 def io_func():
566 pass
567 self._test_close_open_io(io_func)
568
569 def test_close_open_flush(self):
570 def io_func():
571 self.f.flush()
572 self._test_close_open_io(io_func)
573
574 def test_close_open_iter(self):
575 def io_func():
576 list(iter(self.f))
577 self._test_close_open_io(io_func)
578
579 def test_close_open_isatty(self):
580 def io_func():
581 self.f.isatty()
582 self._test_close_open_io(io_func)
583
584 def test_close_open_print(self):
585 def io_func():
586 print >> self.f, ''
587 self._test_close_open_io(io_func)
588
Antoine Pitrou83137c22010-05-17 19:56:59 +0000589 def test_close_open_print_buffered(self):
590 self.use_buffering = True
591 def io_func():
592 print >> self.f, ''
593 self._test_close_open_io(io_func)
594
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000595 def test_close_open_read(self):
596 def io_func():
597 self.f.read(0)
598 self._test_close_open_io(io_func)
599
600 def test_close_open_readinto(self):
601 def io_func():
602 a = array('c', 'xxxxx')
603 self.f.readinto(a)
604 self._test_close_open_io(io_func)
605
606 def test_close_open_readline(self):
607 def io_func():
608 self.f.readline()
609 self._test_close_open_io(io_func)
610
611 def test_close_open_readlines(self):
612 def io_func():
613 self.f.readlines()
614 self._test_close_open_io(io_func)
615
616 def test_close_open_seek(self):
617 def io_func():
618 self.f.seek(0, 0)
619 self._test_close_open_io(io_func)
620
621 def test_close_open_tell(self):
622 def io_func():
623 self.f.tell()
624 self._test_close_open_io(io_func)
625
626 def test_close_open_truncate(self):
627 def io_func():
628 self.f.truncate()
629 self._test_close_open_io(io_func)
630
631 def test_close_open_write(self):
632 def io_func():
633 self.f.write('')
634 self._test_close_open_io(io_func)
635
636 def test_close_open_writelines(self):
637 def io_func():
638 self.f.writelines('')
639 self._test_close_open_io(io_func)
640
641
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700642@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
643class TestFileSignalEINTR(unittest.TestCase):
644 def _test_reading(self, data_to_write, read_and_verify_code, method_name,
645 universal_newlines=False):
646 """Generic buffered read method test harness to verify EINTR behavior.
647
648 Also validates that Python signal handlers are run during the read.
649
650 Args:
651 data_to_write: String to write to the child process for reading
652 before sending it a signal, confirming the signal was handled,
653 writing a final newline char and closing the infile pipe.
654 read_and_verify_code: Single "line" of code to read from a file
655 object named 'infile' and validate the result. This will be
656 executed as part of a python subprocess fed data_to_write.
657 method_name: The name of the read method being tested, for use in
658 an error message on failure.
659 universal_newlines: If True, infile will be opened in universal
660 newline mode in the child process.
661 """
662 if universal_newlines:
663 # Test the \r\n -> \n conversion while we're at it.
664 data_to_write = data_to_write.replace('\n', '\r\n')
665 infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
666 else:
667 infile_setup_code = 'infile = sys.stdin'
668 # Total pipe IO in this function is smaller than the minimum posix OS
669 # pipe buffer size of 512 bytes. No writer should block.
670 assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
671
672 child_code = (
673 'import os, signal, sys ;'
674 'signal.signal('
675 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
676 + infile_setup_code + ' ;' +
677 'assert isinstance(infile, file) ;'
678 'sys.stderr.write("Go.\\n") ;'
679 + read_and_verify_code)
680 reader_process = subprocess.Popen(
681 [sys.executable, '-c', child_code],
682 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
683 stderr=subprocess.PIPE)
684 # Wait for the signal handler to be installed.
685 go = reader_process.stderr.read(4)
686 if go != 'Go.\n':
687 reader_process.kill()
688 self.fail('Error from %s process while awaiting "Go":\n%s' % (
689 method_name, go+reader_process.stderr.read()))
690 reader_process.stdin.write(data_to_write)
691 signals_sent = 0
692 rlist = []
693 # We don't know when the read_and_verify_code in our child is actually
694 # executing within the read system call we want to interrupt. This
695 # loop waits for a bit before sending the first signal to increase
696 # the likelihood of that. Implementations without correct EINTR
697 # and signal handling usually fail this test.
698 while not rlist:
699 rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
700 reader_process.send_signal(signal.SIGINT)
701 # Give the subprocess time to handle it before we loop around and
702 # send another one. On OSX the second signal happening close to
703 # immediately after the first was causing the subprocess to crash
704 # via the OS's default SIGINT handler.
705 time.sleep(0.1)
706 signals_sent += 1
707 if signals_sent > 200:
708 reader_process.kill()
709 self.fail("failed to handle signal during %s." % method_name)
710 # This assumes anything unexpected that writes to stderr will also
711 # write a newline. That is true of the traceback printing code.
712 signal_line = reader_process.stderr.readline()
713 if signal_line != '$\n':
714 reader_process.kill()
715 self.fail('Error from %s process while awaiting signal:\n%s' % (
716 method_name, signal_line+reader_process.stderr.read()))
717 # We append a newline to our input so that a readline call can
718 # end on its own before the EOF is seen.
719 stdout, stderr = reader_process.communicate(input='\n')
720 if reader_process.returncode != 0:
721 self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
722 method_name, reader_process.returncode, stdout, stderr))
723
724 def test_readline(self, universal_newlines=False):
725 """file.readline must handle signals and not lose data."""
726 self._test_reading(
727 data_to_write='hello, world!',
728 read_and_verify_code=(
729 'line = infile.readline() ;'
730 'expected_line = "hello, world!\\n" ;'
731 'assert line == expected_line, ('
732 '"read %r expected %r" % (line, expected_line))'
733 ),
734 method_name='readline',
735 universal_newlines=universal_newlines)
736
737 def test_readline_with_universal_newlines(self):
738 self.test_readline(universal_newlines=True)
739
740 def test_readlines(self, universal_newlines=False):
741 """file.readlines must handle signals and not lose data."""
742 self._test_reading(
743 data_to_write='hello\nworld!',
744 read_and_verify_code=(
745 'lines = infile.readlines() ;'
746 'expected_lines = ["hello\\n", "world!\\n"] ;'
747 'assert lines == expected_lines, ('
748 '"readlines returned wrong data.\\n" '
749 '"got lines %r\\nexpected %r" '
750 '% (lines, expected_lines))'
751 ),
752 method_name='readlines',
753 universal_newlines=universal_newlines)
754
755 def test_readlines_with_universal_newlines(self):
756 self.test_readlines(universal_newlines=True)
757
758 def test_readall(self):
759 """Unbounded file.read() must handle signals and not lose data."""
760 self._test_reading(
761 data_to_write='hello, world!abcdefghijklm',
762 read_and_verify_code=(
763 'data = infile.read() ;'
764 'expected_data = "hello, world!abcdefghijklm\\n";'
765 'assert data == expected_data, ('
766 '"read %r expected %r" % (data, expected_data))'
767 ),
768 method_name='unbounded read')
769
770 def test_readinto(self):
771 """file.readinto must handle signals and not lose data."""
772 self._test_reading(
773 data_to_write='hello, world!',
774 read_and_verify_code=(
775 'data = bytearray(50) ;'
776 'num_read = infile.readinto(data) ;'
777 'expected_data = "hello, world!\\n";'
778 'assert data[:num_read] == expected_data, ('
779 '"read %r expected %r" % (data, expected_data))'
780 ),
781 method_name='readinto')
782
783
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000784class StdoutTests(unittest.TestCase):
785
786 def test_move_stdout_on_write(self):
787 # Issue 3242: sys.stdout can be replaced (and freed) during a
788 # print statement; prevent a segfault in this case
789 save_stdout = sys.stdout
790
791 class File:
792 def write(self, data):
793 if '\n' in data:
794 sys.stdout = save_stdout
795
796 try:
797 sys.stdout = File()
798 print "some text"
799 finally:
800 sys.stdout = save_stdout
801
802 def test_del_stdout_before_print(self):
803 # Issue 4597: 'print' with no argument wasn't reporting when
804 # sys.stdout was deleted.
805 save_stdout = sys.stdout
806 del sys.stdout
807 try:
808 print
809 except RuntimeError as e:
Ezio Melotti2623a372010-11-21 13:34:58 +0000810 self.assertEqual(str(e), "lost sys.stdout")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000811 else:
812 self.fail("Expected RuntimeError")
813 finally:
814 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000815
Victor Stinnercaafd772010-09-08 10:51:01 +0000816 def test_unicode(self):
817 import subprocess
818
819 def get_message(encoding, *code):
820 code = '\n'.join(code)
821 env = os.environ.copy()
822 env['PYTHONIOENCODING'] = encoding
823 process = subprocess.Popen([sys.executable, "-c", code],
824 stdout=subprocess.PIPE, env=env)
825 stdout, stderr = process.communicate()
826 self.assertEqual(process.returncode, 0)
827 return stdout
828
829 def check_message(text, encoding, expected):
830 stdout = get_message(encoding,
831 "import sys",
832 "sys.stdout.write(%r)" % text,
833 "sys.stdout.flush()")
834 self.assertEqual(stdout, expected)
835
Victor Stinner3a68f912010-09-08 11:45:16 +0000836 # test the encoding
837 check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
838 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
839 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
Victor Stinnercaafd772010-09-08 10:51:01 +0000840
Victor Stinner3a68f912010-09-08 11:45:16 +0000841 # test the error handler
842 check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
843 check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
844 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
845
846 # test the buffer API
Victor Stinnercaafd772010-09-08 10:51:01 +0000847 for objtype in ('buffer', 'bytearray'):
848 stdout = get_message('ascii',
849 'import sys',
Victor Stinner3a68f912010-09-08 11:45:16 +0000850 r'sys.stdout.write(%s("\xe9"))' % objtype,
851 'sys.stdout.flush()')
852 self.assertEqual(stdout, "\xe9")
Victor Stinnercaafd772010-09-08 10:51:01 +0000853
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000854
855def test_main():
856 # Historically, these tests have been sloppy about removing TESTFN.
857 # So get rid of it no matter what.
858 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000859 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700860 FileThreadingTests, TestFileSignalEINTR, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000861 finally:
862 if os.path.exists(TESTFN):
863 os.unlink(TESTFN)
864
865if __name__ == '__main__':
866 test_main()