blob: 3803950811f439f8e0444d3a2e4246dba40c14df [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -07005import select
6import signal
7import subprocess
Antoine Pitrou47a5f482009-06-12 20:41:52 +00008import time
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00009from array import array
10from weakref import proxy
Victor Stinner6a102812010-04-27 23:55:59 +000011try:
12 import threading
13except ImportError:
14 threading = None
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000015
Antoine Pitrou47a5f482009-06-12 20:41:52 +000016from test import test_support
Serhiy Storchaka80f0c822014-10-12 17:13:06 +030017from test.test_support import TESTFN, run_unittest, requires
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000018from UserList import UserList
19
20class AutoFileTests(unittest.TestCase):
21 # file tests for which a test file is automatically set up
22
23 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000024 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000025
26 def tearDown(self):
27 if self.f:
28 self.f.close()
29 os.remove(TESTFN)
30
31 def testWeakRefs(self):
32 # verify weak references
33 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000034 p.write('teststring')
Ezio Melotti2623a372010-11-21 13:34:58 +000035 self.assertEqual(self.f.tell(), p.tell())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000036 self.f.close()
37 self.f = None
38 self.assertRaises(ReferenceError, getattr, p, 'tell')
39
40 def testAttributes(self):
41 # verify expected attributes exist
42 f = self.f
Ezio Melottid80b4bf2010-03-17 13:52:48 +000043 with test_support.check_py3k_warnings():
44 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000045 f.name # merely shouldn't blow up
46 f.mode # ditto
47 f.closed # ditto
48
Ezio Melottid80b4bf2010-03-17 13:52:48 +000049 with test_support.check_py3k_warnings():
50 # verify softspace is writable
51 f.softspace = softspace # merely shouldn't blow up
Antoine Pitrou47a5f482009-06-12 20:41:52 +000052
53 # verify the others aren't
54 for attr in 'name', 'mode', 'closed':
55 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
56
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000057 def testReadinto(self):
58 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000059 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000060 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000061 a = array('c', 'x'*10)
62 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000063 n = self.f.readinto(a)
Ezio Melotti2623a372010-11-21 13:34:58 +000064 self.assertEqual('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000065
66 def testWritelinesUserList(self):
67 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000068 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000069 self.f.writelines(l)
70 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000071 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000072 buf = self.f.read()
Ezio Melotti2623a372010-11-21 13:34:58 +000073 self.assertEqual(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000074
75 def testWritelinesIntegers(self):
76 # verify writelines with integers
77 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
78
79 def testWritelinesIntegersUserList(self):
80 # verify writelines with integers in UserList
81 l = UserList([1,2,3])
82 self.assertRaises(TypeError, self.f.writelines, l)
83
84 def testWritelinesNonString(self):
85 # verify writelines with non-string object
86 class NonString:
87 pass
88
89 self.assertRaises(TypeError, self.f.writelines,
90 [NonString(), NonString()])
91
Antoine Pitroub0acc1b2014-05-08 19:26:04 +020092 def testWritelinesBuffer(self):
93 self.f.writelines([array('c', 'abc')])
94 self.f.close()
95 self.f = open(TESTFN, 'rb')
96 buf = self.f.read()
97 self.assertEqual(buf, 'abc')
98
Antoine Pitrou47a5f482009-06-12 20:41:52 +000099 def testRepr(self):
100 # verify repr works
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000101 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
Ezio Melotti11f8b682012-03-12 01:17:02 +0200102 # see issue #14161
103 # Windows doesn't like \r\n\t" in the file name, but ' is ok
104 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
105 with open(fname, 'w') as f:
106 self.addCleanup(os.remove, fname)
107 self.assertTrue(repr(f).startswith(
108 "<open file %r, mode 'w' at" % fname))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000109
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000110 def testErrors(self):
Antoine Pitroubb445a12010-02-05 17:05:54 +0000111 self.f.close()
112 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000113 f = self.f
Ezio Melotti2623a372010-11-21 13:34:58 +0000114 self.assertEqual(f.name, TESTFN)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000115 self.assertTrue(not f.isatty())
116 self.assertTrue(not f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000117
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000118 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000119 f.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000120 self.assertTrue(f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000121
122 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000123 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
124 'readline', 'readlines', 'seek', 'tell', 'truncate',
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000125 'write', '__iter__']
126 deprecated_methods = ['xreadlines']
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000127 if sys.platform.startswith('atheos'):
128 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000129
130 # __exit__ should close the file
131 self.f.__exit__(None, None, None)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000132 self.assertTrue(self.f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000133
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000134 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000135 method = getattr(self.f, methodname)
136 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000137 self.assertRaises(ValueError, method)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000138 with test_support.check_py3k_warnings():
139 for methodname in deprecated_methods:
140 method = getattr(self.f, methodname)
141 self.assertRaises(ValueError, method)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000142 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000143
144 # file is closed, __exit__ shouldn't do anything
Ezio Melotti2623a372010-11-21 13:34:58 +0000145 self.assertEqual(self.f.__exit__(None, None, None), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000146 # it must also return None if an exception was given
147 try:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000148 1 // 0
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000149 except:
Ezio Melotti2623a372010-11-21 13:34:58 +0000150 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000151
152 def testReadWhenWriting(self):
153 self.assertRaises(IOError, self.f.read)
154
Benjamin Petersonbf775542010-10-16 19:20:12 +0000155 def testNastyWritelinesGenerator(self):
156 def nasty():
157 for i in range(5):
158 if i == 3:
159 self.f.close()
160 yield str(i)
161 self.assertRaises(ValueError, self.f.writelines, nasty())
162
Antoine Pitroubb445a12010-02-05 17:05:54 +0000163 def testIssue5677(self):
164 # Remark: Do not perform more than one test per open file,
165 # since that does NOT catch the readline error on Windows.
166 data = 'xxx'
167 for mode in ['w', 'wb', 'a', 'ab']:
168 for attr in ['read', 'readline', 'readlines']:
169 self.f = open(TESTFN, mode)
170 self.f.write(data)
171 self.assertRaises(IOError, getattr(self.f, attr))
172 self.f.close()
173
174 self.f = open(TESTFN, mode)
175 self.f.write(data)
176 self.assertRaises(IOError, lambda: [line for line in self.f])
177 self.f.close()
178
179 self.f = open(TESTFN, mode)
180 self.f.write(data)
181 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
182 self.f.close()
183
184 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
185 self.f = open(TESTFN, mode)
186 self.assertRaises(IOError, self.f.write, data)
187 self.f.close()
188
189 self.f = open(TESTFN, mode)
190 self.assertRaises(IOError, self.f.writelines, [data, data])
191 self.f.close()
192
193 self.f = open(TESTFN, mode)
194 self.assertRaises(IOError, self.f.truncate)
195 self.f.close()
196
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000197class OtherFileTests(unittest.TestCase):
198
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000199 def testOpenDir(self):
Hirokazu Yamamotofa647ec2010-09-23 15:59:21 +0000200 this_dir = os.path.dirname(__file__) or os.curdir
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000201 for mode in (None, "w"):
202 try:
203 if mode:
204 f = open(this_dir, mode)
205 else:
206 f = open(this_dir)
207 except IOError as e:
208 self.assertEqual(e.filename, this_dir)
209 else:
210 self.fail("opening a directory didn't raise an IOError")
211
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000212 def testModeStrings(self):
213 # check invalid mode strings
214 for mode in ("", "aU", "wU+"):
215 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000216 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000217 except ValueError:
218 pass
219 else:
220 f.close()
221 self.fail('%r is an invalid file mode' % mode)
222
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000223 # Some invalid modes fail on Windows, but pass on Unix
224 # Issue3965: avoid a crash on Windows when filename is unicode
225 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
226 try:
227 f = open(name, "rr")
228 except (IOError, ValueError):
229 pass
230 else:
231 f.close()
232
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000233 def testStdin(self):
234 # This causes the interpreter to exit on OSF1 v5.1.
235 if sys.platform != 'osf1V5':
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000236 self.assertRaises(IOError, sys.stdin.seek, -1)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000237 else:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000238 print >>sys.__stdout__, (
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000239 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000240 ' Test manually.')
241 self.assertRaises(IOError, sys.stdin.truncate)
242
243 def testUnicodeOpen(self):
244 # verify repr works for unicode too
245 f = open(unicode(TESTFN), "w")
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000246 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000247 f.close()
248 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000249
250 def testBadModeArgument(self):
251 # verify that we get a sensible error message for bad mode argument
252 bad_mode = "qwerty"
253 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000254 f = open(TESTFN, bad_mode)
255 except ValueError, msg:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000256 if msg.args[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000257 s = str(msg)
Ezio Melotti187f93d2010-03-17 14:22:34 +0000258 if TESTFN in s or bad_mode not in s:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000259 self.fail("bad error message for invalid mode: %s" % s)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000260 # if msg.args[0] == 0, we're probably on Windows where there may
261 # be no obvious way to discover why open() failed.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000262 else:
263 f.close()
264 self.fail("no error for invalid mode: %s" % bad_mode)
265
266 def testSetBufferSize(self):
267 # make sure that explicitly setting the buffer size doesn't cause
268 # misbehaviour especially with repeated close() calls
269 for s in (-1, 0, 1, 512):
270 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000271 f = open(TESTFN, 'w', s)
272 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000273 f.close()
274 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000275 f = open(TESTFN, 'r', s)
276 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000277 f.close()
278 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000279 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000280 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
Ezio Melotti2623a372010-11-21 13:34:58 +0000281 self.assertEqual(d, s)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000282
283 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000284 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000285
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000286 def bug801631():
287 # SF bug <http://www.python.org/sf/801631>
288 # "file.truncate fault on windows"
289 f = open(TESTFN, 'wb')
290 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000291 f.close()
292
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000293 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000294 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000295 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000296 self.fail("Read on file opened for update failed %r" % data)
297 if f.tell() != 5:
298 self.fail("File pos after read wrong %d" % f.tell())
299
300 f.truncate()
301 if f.tell() != 5:
302 self.fail("File pos after ftruncate wrong %d" % f.tell())
303
304 f.close()
305 size = os.path.getsize(TESTFN)
306 if size != 5:
307 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000308
309 try:
310 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000311 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000312 os.unlink(TESTFN)
313
314 def testIteration(self):
315 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000316 # various read* methods. Ostensibly, the mixture could just be tested
317 # to work when it should work according to the Python language,
318 # instead of fail when it should fail according to the current CPython
319 # implementation. People don't always program Python the way they
320 # should, though, and the implemenation might change in subtle ways,
321 # so we explicitly test for errors, too; the test will just have to
322 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000323 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000324 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000325 assert not dataoffset % len(filler), \
326 "dataoffset must be multiple of len(filler)"
327 nchunks = dataoffset // len(filler)
328 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000329 "spam, spam and eggs\n",
330 "eggs, spam, ham and spam\n",
331 "saussages, spam, spam and eggs\n",
332 "spam, ham, spam and eggs\n",
333 "spam, spam, spam, spam, spam, ham, spam\n",
334 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000335 ]
336 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000337 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000338
339 try:
340 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000341 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000342 bag.write(filler * nchunks)
343 bag.writelines(testlines)
344 bag.close()
345 # Test for appropriate errors mixing read* and iteration
346 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000347 f = open(TESTFN)
348 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000349 self.fail, "Broken testfile"
350 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000351 try:
352 meth(*args)
353 except ValueError:
354 pass
355 else:
356 self.fail("%s%r after next() didn't raise ValueError" %
357 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000358 f.close()
359
360 # Test to see if harmless (by accident) mixing of read* and
361 # iteration still works. This depends on the size of the internal
362 # iteration buffer (currently 8192,) but we can test it in a
363 # flexible manner. Each line in the bag o' ham is 4 bytes
364 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
365 # exactly on the buffer boundary for any power-of-2 buffersize
366 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000367 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000368 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000369 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000370 testline = testlines.pop(0)
371 try:
372 line = f.readline()
373 except ValueError:
374 self.fail("readline() after next() with supposedly empty "
375 "iteration-buffer failed anyway")
376 if line != testline:
377 self.fail("readline() after next() with empty buffer "
378 "failed. Got %r, expected %r" % (line, testline))
379 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000380 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000381 try:
382 f.readinto(buf)
383 except ValueError:
384 self.fail("readinto() after next() with supposedly empty "
385 "iteration-buffer failed anyway")
386 line = buf.tostring()
387 if line != testline:
388 self.fail("readinto() after next() with empty buffer "
389 "failed. Got %r, expected %r" % (line, testline))
390
391 testline = testlines.pop(0)
392 try:
393 line = f.read(len(testline))
394 except ValueError:
395 self.fail("read() after next() with supposedly empty "
396 "iteration-buffer failed anyway")
397 if line != testline:
398 self.fail("read() after next() with empty buffer "
399 "failed. Got %r, expected %r" % (line, testline))
400 try:
401 lines = f.readlines()
402 except ValueError:
403 self.fail("readlines() after next() with supposedly empty "
404 "iteration-buffer failed anyway")
405 if lines != testlines:
406 self.fail("readlines() after next() with empty buffer "
407 "failed. Got %r, expected %r" % (line, testline))
408 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000409 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000410 try:
411 for line in f:
412 pass
413 try:
414 f.readline()
415 f.readinto(buf)
416 f.read()
417 f.readlines()
418 except ValueError:
419 self.fail("read* failed after next() consumed file")
420 finally:
421 f.close()
422 finally:
423 os.unlink(TESTFN)
424
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200425 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
426 def test_write_full(self):
427 # Issue #17976
Serhiy Storchaka84e7e5f2013-12-17 14:53:32 +0200428 try:
429 f = open('/dev/full', 'w', 1)
430 except IOError:
431 self.skipTest("requires '/dev/full'")
432 try:
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200433 with self.assertRaises(IOError):
434 f.write('hello')
435 f.write('\n')
Serhiy Storchaka84e7e5f2013-12-17 14:53:32 +0200436 finally:
437 f.close()
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200438
Benjamin Petersonbc4a8342014-09-30 21:28:27 -0400439 @unittest.skipUnless(sys.maxsize > 2**31, "requires 64-bit system")
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300440 @test_support.precisionbigmemtest(2**31, 2.5, dry_run=False)
441 def test_very_long_line(self, size):
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400442 # Issue #22526
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300443 requires('largefile')
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400444 with open(TESTFN, "wb") as fp:
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300445 fp.seek(size - 1)
446 fp.write("\0")
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400447 with open(TESTFN, "rb") as fp:
448 for l in fp:
449 pass
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300450 self.assertEqual(len(l), size)
451 self.assertEqual(l.count("\0"), size)
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400452 l = None
453
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000454class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000455
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000456 def testExit(self):
457 # test that exiting with context calls subclass' close
458 class C(file):
459 def __init__(self, *args):
460 self.subclass_closed = False
461 file.__init__(self, *args)
462 def close(self):
463 self.subclass_closed = True
464 file.close(self)
465
466 with C(TESTFN, 'w') as f:
467 pass
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000468 self.assertTrue(f.subclass_closed)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000469
470
Victor Stinner6a102812010-04-27 23:55:59 +0000471@unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000472class FileThreadingTests(unittest.TestCase):
473 # These tests check the ability to call various methods of file objects
474 # (including close()) concurrently without crashing the Python interpreter.
475 # See #815646, #595601
476
477 def setUp(self):
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000478 self._threads = test_support.threading_setup()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000479 self.f = None
480 self.filename = TESTFN
481 with open(self.filename, "w") as f:
482 f.write("\n".join("0123456789"))
483 self._count_lock = threading.Lock()
484 self.close_count = 0
485 self.close_success_count = 0
Antoine Pitrou83137c22010-05-17 19:56:59 +0000486 self.use_buffering = False
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000487
488 def tearDown(self):
489 if self.f:
490 try:
491 self.f.close()
492 except (EnvironmentError, ValueError):
493 pass
494 try:
495 os.remove(self.filename)
496 except EnvironmentError:
497 pass
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000498 test_support.threading_cleanup(*self._threads)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000499
500 def _create_file(self):
Antoine Pitrou83137c22010-05-17 19:56:59 +0000501 if self.use_buffering:
502 self.f = open(self.filename, "w+", buffering=1024*16)
503 else:
504 self.f = open(self.filename, "w+")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000505
506 def _close_file(self):
507 with self._count_lock:
508 self.close_count += 1
509 self.f.close()
510 with self._count_lock:
511 self.close_success_count += 1
512
513 def _close_and_reopen_file(self):
514 self._close_file()
515 # if close raises an exception thats fine, self.f remains valid so
516 # we don't need to reopen.
517 self._create_file()
518
519 def _run_workers(self, func, nb_workers, duration=0.2):
520 with self._count_lock:
521 self.close_count = 0
522 self.close_success_count = 0
523 self.do_continue = True
524 threads = []
525 try:
526 for i in range(nb_workers):
527 t = threading.Thread(target=func)
528 t.start()
529 threads.append(t)
530 for _ in xrange(100):
531 time.sleep(duration/100)
532 with self._count_lock:
533 if self.close_count-self.close_success_count > nb_workers+1:
534 if test_support.verbose:
535 print 'Q',
536 break
537 time.sleep(duration)
538 finally:
539 self.do_continue = False
540 for t in threads:
541 t.join()
542
543 def _test_close_open_io(self, io_func, nb_workers=5):
544 def worker():
545 self._create_file()
546 funcs = itertools.cycle((
547 lambda: io_func(),
548 lambda: self._close_and_reopen_file(),
549 ))
550 for f in funcs:
551 if not self.do_continue:
552 break
553 try:
554 f()
555 except (IOError, ValueError):
556 pass
557 self._run_workers(worker, nb_workers)
558 if test_support.verbose:
559 # Useful verbose statistics when tuning this test to take
560 # less time to run but still ensuring that its still useful.
561 #
562 # the percent of close calls that raised an error
563 percent = 100. - 100.*self.close_success_count/self.close_count
564 print self.close_count, ('%.4f ' % percent),
565
566 def test_close_open(self):
567 def io_func():
568 pass
569 self._test_close_open_io(io_func)
570
571 def test_close_open_flush(self):
572 def io_func():
573 self.f.flush()
574 self._test_close_open_io(io_func)
575
576 def test_close_open_iter(self):
577 def io_func():
578 list(iter(self.f))
579 self._test_close_open_io(io_func)
580
581 def test_close_open_isatty(self):
582 def io_func():
583 self.f.isatty()
584 self._test_close_open_io(io_func)
585
586 def test_close_open_print(self):
587 def io_func():
588 print >> self.f, ''
589 self._test_close_open_io(io_func)
590
Antoine Pitrou83137c22010-05-17 19:56:59 +0000591 def test_close_open_print_buffered(self):
592 self.use_buffering = True
593 def io_func():
594 print >> self.f, ''
595 self._test_close_open_io(io_func)
596
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000597 def test_close_open_read(self):
598 def io_func():
599 self.f.read(0)
600 self._test_close_open_io(io_func)
601
602 def test_close_open_readinto(self):
603 def io_func():
604 a = array('c', 'xxxxx')
605 self.f.readinto(a)
606 self._test_close_open_io(io_func)
607
608 def test_close_open_readline(self):
609 def io_func():
610 self.f.readline()
611 self._test_close_open_io(io_func)
612
613 def test_close_open_readlines(self):
614 def io_func():
615 self.f.readlines()
616 self._test_close_open_io(io_func)
617
618 def test_close_open_seek(self):
619 def io_func():
620 self.f.seek(0, 0)
621 self._test_close_open_io(io_func)
622
623 def test_close_open_tell(self):
624 def io_func():
625 self.f.tell()
626 self._test_close_open_io(io_func)
627
628 def test_close_open_truncate(self):
629 def io_func():
630 self.f.truncate()
631 self._test_close_open_io(io_func)
632
633 def test_close_open_write(self):
634 def io_func():
635 self.f.write('')
636 self._test_close_open_io(io_func)
637
638 def test_close_open_writelines(self):
639 def io_func():
640 self.f.writelines('')
641 self._test_close_open_io(io_func)
642
643
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700644@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
645class TestFileSignalEINTR(unittest.TestCase):
646 def _test_reading(self, data_to_write, read_and_verify_code, method_name,
647 universal_newlines=False):
648 """Generic buffered read method test harness to verify EINTR behavior.
649
650 Also validates that Python signal handlers are run during the read.
651
652 Args:
653 data_to_write: String to write to the child process for reading
654 before sending it a signal, confirming the signal was handled,
655 writing a final newline char and closing the infile pipe.
656 read_and_verify_code: Single "line" of code to read from a file
657 object named 'infile' and validate the result. This will be
658 executed as part of a python subprocess fed data_to_write.
659 method_name: The name of the read method being tested, for use in
660 an error message on failure.
661 universal_newlines: If True, infile will be opened in universal
662 newline mode in the child process.
663 """
664 if universal_newlines:
665 # Test the \r\n -> \n conversion while we're at it.
666 data_to_write = data_to_write.replace('\n', '\r\n')
667 infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
668 else:
669 infile_setup_code = 'infile = sys.stdin'
670 # Total pipe IO in this function is smaller than the minimum posix OS
671 # pipe buffer size of 512 bytes. No writer should block.
672 assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
673
674 child_code = (
675 'import os, signal, sys ;'
676 'signal.signal('
677 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
678 + infile_setup_code + ' ;' +
679 'assert isinstance(infile, file) ;'
680 'sys.stderr.write("Go.\\n") ;'
681 + read_and_verify_code)
682 reader_process = subprocess.Popen(
683 [sys.executable, '-c', child_code],
684 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
685 stderr=subprocess.PIPE)
686 # Wait for the signal handler to be installed.
687 go = reader_process.stderr.read(4)
688 if go != 'Go.\n':
689 reader_process.kill()
690 self.fail('Error from %s process while awaiting "Go":\n%s' % (
691 method_name, go+reader_process.stderr.read()))
692 reader_process.stdin.write(data_to_write)
693 signals_sent = 0
694 rlist = []
695 # We don't know when the read_and_verify_code in our child is actually
696 # executing within the read system call we want to interrupt. This
697 # loop waits for a bit before sending the first signal to increase
698 # the likelihood of that. Implementations without correct EINTR
699 # and signal handling usually fail this test.
700 while not rlist:
701 rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
702 reader_process.send_signal(signal.SIGINT)
703 # Give the subprocess time to handle it before we loop around and
704 # send another one. On OSX the second signal happening close to
705 # immediately after the first was causing the subprocess to crash
706 # via the OS's default SIGINT handler.
707 time.sleep(0.1)
708 signals_sent += 1
709 if signals_sent > 200:
710 reader_process.kill()
711 self.fail("failed to handle signal during %s." % method_name)
712 # This assumes anything unexpected that writes to stderr will also
713 # write a newline. That is true of the traceback printing code.
714 signal_line = reader_process.stderr.readline()
715 if signal_line != '$\n':
716 reader_process.kill()
717 self.fail('Error from %s process while awaiting signal:\n%s' % (
718 method_name, signal_line+reader_process.stderr.read()))
719 # We append a newline to our input so that a readline call can
720 # end on its own before the EOF is seen.
721 stdout, stderr = reader_process.communicate(input='\n')
722 if reader_process.returncode != 0:
723 self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
724 method_name, reader_process.returncode, stdout, stderr))
725
726 def test_readline(self, universal_newlines=False):
727 """file.readline must handle signals and not lose data."""
728 self._test_reading(
729 data_to_write='hello, world!',
730 read_and_verify_code=(
731 'line = infile.readline() ;'
732 'expected_line = "hello, world!\\n" ;'
733 'assert line == expected_line, ('
734 '"read %r expected %r" % (line, expected_line))'
735 ),
736 method_name='readline',
737 universal_newlines=universal_newlines)
738
739 def test_readline_with_universal_newlines(self):
740 self.test_readline(universal_newlines=True)
741
742 def test_readlines(self, universal_newlines=False):
743 """file.readlines must handle signals and not lose data."""
744 self._test_reading(
745 data_to_write='hello\nworld!',
746 read_and_verify_code=(
747 'lines = infile.readlines() ;'
748 'expected_lines = ["hello\\n", "world!\\n"] ;'
749 'assert lines == expected_lines, ('
750 '"readlines returned wrong data.\\n" '
751 '"got lines %r\\nexpected %r" '
752 '% (lines, expected_lines))'
753 ),
754 method_name='readlines',
755 universal_newlines=universal_newlines)
756
757 def test_readlines_with_universal_newlines(self):
758 self.test_readlines(universal_newlines=True)
759
760 def test_readall(self):
761 """Unbounded file.read() must handle signals and not lose data."""
762 self._test_reading(
763 data_to_write='hello, world!abcdefghijklm',
764 read_and_verify_code=(
765 'data = infile.read() ;'
766 'expected_data = "hello, world!abcdefghijklm\\n";'
767 'assert data == expected_data, ('
768 '"read %r expected %r" % (data, expected_data))'
769 ),
770 method_name='unbounded read')
771
772 def test_readinto(self):
773 """file.readinto must handle signals and not lose data."""
774 self._test_reading(
775 data_to_write='hello, world!',
776 read_and_verify_code=(
777 'data = bytearray(50) ;'
778 'num_read = infile.readinto(data) ;'
779 'expected_data = "hello, world!\\n";'
780 'assert data[:num_read] == expected_data, ('
781 '"read %r expected %r" % (data, expected_data))'
782 ),
783 method_name='readinto')
784
785
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000786class StdoutTests(unittest.TestCase):
787
788 def test_move_stdout_on_write(self):
789 # Issue 3242: sys.stdout can be replaced (and freed) during a
790 # print statement; prevent a segfault in this case
791 save_stdout = sys.stdout
792
793 class File:
794 def write(self, data):
795 if '\n' in data:
796 sys.stdout = save_stdout
797
798 try:
799 sys.stdout = File()
800 print "some text"
801 finally:
802 sys.stdout = save_stdout
803
804 def test_del_stdout_before_print(self):
805 # Issue 4597: 'print' with no argument wasn't reporting when
806 # sys.stdout was deleted.
807 save_stdout = sys.stdout
808 del sys.stdout
809 try:
810 print
811 except RuntimeError as e:
Ezio Melotti2623a372010-11-21 13:34:58 +0000812 self.assertEqual(str(e), "lost sys.stdout")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000813 else:
814 self.fail("Expected RuntimeError")
815 finally:
816 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000817
Victor Stinnercaafd772010-09-08 10:51:01 +0000818 def test_unicode(self):
819 import subprocess
820
821 def get_message(encoding, *code):
822 code = '\n'.join(code)
823 env = os.environ.copy()
824 env['PYTHONIOENCODING'] = encoding
825 process = subprocess.Popen([sys.executable, "-c", code],
826 stdout=subprocess.PIPE, env=env)
827 stdout, stderr = process.communicate()
828 self.assertEqual(process.returncode, 0)
829 return stdout
830
831 def check_message(text, encoding, expected):
832 stdout = get_message(encoding,
833 "import sys",
834 "sys.stdout.write(%r)" % text,
835 "sys.stdout.flush()")
836 self.assertEqual(stdout, expected)
837
Victor Stinner3a68f912010-09-08 11:45:16 +0000838 # test the encoding
839 check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
840 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
841 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
Victor Stinnercaafd772010-09-08 10:51:01 +0000842
Victor Stinner3a68f912010-09-08 11:45:16 +0000843 # test the error handler
844 check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
845 check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
846 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
847
848 # test the buffer API
Victor Stinnercaafd772010-09-08 10:51:01 +0000849 for objtype in ('buffer', 'bytearray'):
850 stdout = get_message('ascii',
851 'import sys',
Victor Stinner3a68f912010-09-08 11:45:16 +0000852 r'sys.stdout.write(%s("\xe9"))' % objtype,
853 'sys.stdout.flush()')
854 self.assertEqual(stdout, "\xe9")
Victor Stinnercaafd772010-09-08 10:51:01 +0000855
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000856
857def test_main():
858 # Historically, these tests have been sloppy about removing TESTFN.
859 # So get rid of it no matter what.
860 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000861 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700862 FileThreadingTests, TestFileSignalEINTR, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000863 finally:
864 if os.path.exists(TESTFN):
865 os.unlink(TESTFN)
866
867if __name__ == '__main__':
868 test_main()