blob: c5dd1a6b5acd8733323bd7a64361dab97d6ef907 [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -07005import select
6import signal
7import subprocess
Antoine Pitrou47a5f482009-06-12 20:41:52 +00008import time
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00009from array import array
10from weakref import proxy
Victor Stinner6a102812010-04-27 23:55:59 +000011try:
12 import threading
13except ImportError:
14 threading = None
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000015
Antoine Pitrou47a5f482009-06-12 20:41:52 +000016from test import test_support
Serhiy Storchaka80f0c822014-10-12 17:13:06 +030017from test.test_support import TESTFN, run_unittest, requires
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000018from UserList import UserList
19
20class AutoFileTests(unittest.TestCase):
21 # file tests for which a test file is automatically set up
22
23 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000024 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000025
26 def tearDown(self):
27 if self.f:
28 self.f.close()
29 os.remove(TESTFN)
30
31 def testWeakRefs(self):
32 # verify weak references
33 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000034 p.write('teststring')
Ezio Melotti2623a372010-11-21 13:34:58 +000035 self.assertEqual(self.f.tell(), p.tell())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000036 self.f.close()
37 self.f = None
38 self.assertRaises(ReferenceError, getattr, p, 'tell')
39
40 def testAttributes(self):
41 # verify expected attributes exist
42 f = self.f
Ezio Melottid80b4bf2010-03-17 13:52:48 +000043 with test_support.check_py3k_warnings():
44 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000045 f.name # merely shouldn't blow up
46 f.mode # ditto
47 f.closed # ditto
48
Ezio Melottid80b4bf2010-03-17 13:52:48 +000049 with test_support.check_py3k_warnings():
50 # verify softspace is writable
51 f.softspace = softspace # merely shouldn't blow up
Antoine Pitrou47a5f482009-06-12 20:41:52 +000052
53 # verify the others aren't
54 for attr in 'name', 'mode', 'closed':
55 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
56
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000057 def testReadinto(self):
58 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000059 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000060 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000061 a = array('c', 'x'*10)
62 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000063 n = self.f.readinto(a)
Ezio Melotti2623a372010-11-21 13:34:58 +000064 self.assertEqual('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000065
66 def testWritelinesUserList(self):
67 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000068 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000069 self.f.writelines(l)
70 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000071 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000072 buf = self.f.read()
Ezio Melotti2623a372010-11-21 13:34:58 +000073 self.assertEqual(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000074
75 def testWritelinesIntegers(self):
76 # verify writelines with integers
77 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
78
79 def testWritelinesIntegersUserList(self):
80 # verify writelines with integers in UserList
81 l = UserList([1,2,3])
82 self.assertRaises(TypeError, self.f.writelines, l)
83
84 def testWritelinesNonString(self):
85 # verify writelines with non-string object
86 class NonString:
87 pass
88
89 self.assertRaises(TypeError, self.f.writelines,
90 [NonString(), NonString()])
91
Antoine Pitroub0acc1b2014-05-08 19:26:04 +020092 def testWritelinesBuffer(self):
93 self.f.writelines([array('c', 'abc')])
94 self.f.close()
95 self.f = open(TESTFN, 'rb')
96 buf = self.f.read()
97 self.assertEqual(buf, 'abc')
98
Antoine Pitrou47a5f482009-06-12 20:41:52 +000099 def testRepr(self):
100 # verify repr works
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000101 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
Ezio Melotti11f8b682012-03-12 01:17:02 +0200102 # see issue #14161
103 # Windows doesn't like \r\n\t" in the file name, but ' is ok
104 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
105 with open(fname, 'w') as f:
106 self.addCleanup(os.remove, fname)
107 self.assertTrue(repr(f).startswith(
108 "<open file %r, mode 'w' at" % fname))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000109
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000110 def testErrors(self):
Antoine Pitroubb445a12010-02-05 17:05:54 +0000111 self.f.close()
112 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000113 f = self.f
Ezio Melotti2623a372010-11-21 13:34:58 +0000114 self.assertEqual(f.name, TESTFN)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000115 self.assertTrue(not f.isatty())
116 self.assertTrue(not f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000117
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000118 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000119 f.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000120 self.assertTrue(f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000121
122 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000123 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
124 'readline', 'readlines', 'seek', 'tell', 'truncate',
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000125 'write', '__iter__']
126 deprecated_methods = ['xreadlines']
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000127 if sys.platform.startswith('atheos'):
128 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000129
130 # __exit__ should close the file
131 self.f.__exit__(None, None, None)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000132 self.assertTrue(self.f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000133
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000134 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000135 method = getattr(self.f, methodname)
136 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000137 self.assertRaises(ValueError, method)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000138 with test_support.check_py3k_warnings():
139 for methodname in deprecated_methods:
140 method = getattr(self.f, methodname)
141 self.assertRaises(ValueError, method)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000142 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000143
144 # file is closed, __exit__ shouldn't do anything
Ezio Melotti2623a372010-11-21 13:34:58 +0000145 self.assertEqual(self.f.__exit__(None, None, None), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000146 # it must also return None if an exception was given
147 try:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000148 1 // 0
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000149 except:
Ezio Melotti2623a372010-11-21 13:34:58 +0000150 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000151
152 def testReadWhenWriting(self):
153 self.assertRaises(IOError, self.f.read)
154
Benjamin Petersonbf775542010-10-16 19:20:12 +0000155 def testNastyWritelinesGenerator(self):
156 def nasty():
157 for i in range(5):
158 if i == 3:
159 self.f.close()
160 yield str(i)
161 self.assertRaises(ValueError, self.f.writelines, nasty())
162
Antoine Pitroubb445a12010-02-05 17:05:54 +0000163 def testIssue5677(self):
164 # Remark: Do not perform more than one test per open file,
165 # since that does NOT catch the readline error on Windows.
166 data = 'xxx'
167 for mode in ['w', 'wb', 'a', 'ab']:
168 for attr in ['read', 'readline', 'readlines']:
169 self.f = open(TESTFN, mode)
170 self.f.write(data)
171 self.assertRaises(IOError, getattr(self.f, attr))
172 self.f.close()
173
174 self.f = open(TESTFN, mode)
175 self.f.write(data)
176 self.assertRaises(IOError, lambda: [line for line in self.f])
177 self.f.close()
178
179 self.f = open(TESTFN, mode)
180 self.f.write(data)
181 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
182 self.f.close()
183
184 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
185 self.f = open(TESTFN, mode)
186 self.assertRaises(IOError, self.f.write, data)
187 self.f.close()
188
189 self.f = open(TESTFN, mode)
190 self.assertRaises(IOError, self.f.writelines, [data, data])
191 self.f.close()
192
193 self.f = open(TESTFN, mode)
194 self.assertRaises(IOError, self.f.truncate)
195 self.f.close()
196
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000197class OtherFileTests(unittest.TestCase):
198
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000199 def testOpenDir(self):
Hirokazu Yamamotofa647ec2010-09-23 15:59:21 +0000200 this_dir = os.path.dirname(__file__) or os.curdir
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000201 for mode in (None, "w"):
202 try:
203 if mode:
204 f = open(this_dir, mode)
205 else:
206 f = open(this_dir)
207 except IOError as e:
208 self.assertEqual(e.filename, this_dir)
209 else:
210 self.fail("opening a directory didn't raise an IOError")
211
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000212 def testModeStrings(self):
213 # check invalid mode strings
214 for mode in ("", "aU", "wU+"):
215 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000216 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000217 except ValueError:
218 pass
219 else:
220 f.close()
221 self.fail('%r is an invalid file mode' % mode)
222
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000223 # Some invalid modes fail on Windows, but pass on Unix
224 # Issue3965: avoid a crash on Windows when filename is unicode
225 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
226 try:
227 f = open(name, "rr")
228 except (IOError, ValueError):
229 pass
230 else:
231 f.close()
232
Victor Stinner7ba8cdc2015-01-06 12:39:45 +0100233 def testStdinSeek(self):
234 if sys.platform == 'osf1V5':
235 # This causes the interpreter to exit on OSF1 v5.1.
236 self.skipTest('Skipping sys.stdin.seek(-1), it may crash '
237 'the interpreter. Test manually.')
238
239 if not sys.stdin.isatty():
240 # Issue #23168: if stdin is redirected to a file, stdin becomes
241 # seekable
242 self.skipTest('stdin must be a TTY in this test')
243
244 self.assertRaises(IOError, sys.stdin.seek, -1)
245
246 def testStdinTruncate(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000247 self.assertRaises(IOError, sys.stdin.truncate)
248
249 def testUnicodeOpen(self):
250 # verify repr works for unicode too
251 f = open(unicode(TESTFN), "w")
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000252 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000253 f.close()
254 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000255
256 def testBadModeArgument(self):
257 # verify that we get a sensible error message for bad mode argument
258 bad_mode = "qwerty"
259 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000260 f = open(TESTFN, bad_mode)
261 except ValueError, msg:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000262 if msg.args[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000263 s = str(msg)
Ezio Melotti187f93d2010-03-17 14:22:34 +0000264 if TESTFN in s or bad_mode not in s:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000265 self.fail("bad error message for invalid mode: %s" % s)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000266 # if msg.args[0] == 0, we're probably on Windows where there may
267 # be no obvious way to discover why open() failed.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000268 else:
269 f.close()
270 self.fail("no error for invalid mode: %s" % bad_mode)
271
272 def testSetBufferSize(self):
273 # make sure that explicitly setting the buffer size doesn't cause
274 # misbehaviour especially with repeated close() calls
275 for s in (-1, 0, 1, 512):
276 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000277 f = open(TESTFN, 'w', s)
278 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000279 f.close()
280 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000281 f = open(TESTFN, 'r', s)
282 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000283 f.close()
284 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000285 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000286 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
Ezio Melotti2623a372010-11-21 13:34:58 +0000287 self.assertEqual(d, s)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000288
289 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000290 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000291
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000292 def bug801631():
293 # SF bug <http://www.python.org/sf/801631>
294 # "file.truncate fault on windows"
295 f = open(TESTFN, 'wb')
296 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000297 f.close()
298
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000299 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000300 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000301 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000302 self.fail("Read on file opened for update failed %r" % data)
303 if f.tell() != 5:
304 self.fail("File pos after read wrong %d" % f.tell())
305
306 f.truncate()
307 if f.tell() != 5:
308 self.fail("File pos after ftruncate wrong %d" % f.tell())
309
310 f.close()
311 size = os.path.getsize(TESTFN)
312 if size != 5:
313 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000314
315 try:
316 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000317 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000318 os.unlink(TESTFN)
319
320 def testIteration(self):
321 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000322 # various read* methods. Ostensibly, the mixture could just be tested
323 # to work when it should work according to the Python language,
324 # instead of fail when it should fail according to the current CPython
325 # implementation. People don't always program Python the way they
326 # should, though, and the implemenation might change in subtle ways,
327 # so we explicitly test for errors, too; the test will just have to
328 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000329 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000330 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000331 assert not dataoffset % len(filler), \
332 "dataoffset must be multiple of len(filler)"
333 nchunks = dataoffset // len(filler)
334 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000335 "spam, spam and eggs\n",
336 "eggs, spam, ham and spam\n",
337 "saussages, spam, spam and eggs\n",
338 "spam, ham, spam and eggs\n",
339 "spam, spam, spam, spam, spam, ham, spam\n",
340 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000341 ]
342 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000343 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000344
345 try:
346 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000347 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000348 bag.write(filler * nchunks)
349 bag.writelines(testlines)
350 bag.close()
351 # Test for appropriate errors mixing read* and iteration
352 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000353 f = open(TESTFN)
354 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000355 self.fail, "Broken testfile"
356 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000357 try:
358 meth(*args)
359 except ValueError:
360 pass
361 else:
362 self.fail("%s%r after next() didn't raise ValueError" %
363 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000364 f.close()
365
366 # Test to see if harmless (by accident) mixing of read* and
367 # iteration still works. This depends on the size of the internal
368 # iteration buffer (currently 8192,) but we can test it in a
369 # flexible manner. Each line in the bag o' ham is 4 bytes
370 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
371 # exactly on the buffer boundary for any power-of-2 buffersize
372 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000373 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000374 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000375 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000376 testline = testlines.pop(0)
377 try:
378 line = f.readline()
379 except ValueError:
380 self.fail("readline() after next() with supposedly empty "
381 "iteration-buffer failed anyway")
382 if line != testline:
383 self.fail("readline() after next() with empty buffer "
384 "failed. Got %r, expected %r" % (line, testline))
385 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000386 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000387 try:
388 f.readinto(buf)
389 except ValueError:
390 self.fail("readinto() after next() with supposedly empty "
391 "iteration-buffer failed anyway")
392 line = buf.tostring()
393 if line != testline:
394 self.fail("readinto() after next() with empty buffer "
395 "failed. Got %r, expected %r" % (line, testline))
396
397 testline = testlines.pop(0)
398 try:
399 line = f.read(len(testline))
400 except ValueError:
401 self.fail("read() after next() with supposedly empty "
402 "iteration-buffer failed anyway")
403 if line != testline:
404 self.fail("read() after next() with empty buffer "
405 "failed. Got %r, expected %r" % (line, testline))
406 try:
407 lines = f.readlines()
408 except ValueError:
409 self.fail("readlines() after next() with supposedly empty "
410 "iteration-buffer failed anyway")
411 if lines != testlines:
412 self.fail("readlines() after next() with empty buffer "
413 "failed. Got %r, expected %r" % (line, testline))
414 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000415 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000416 try:
417 for line in f:
418 pass
419 try:
420 f.readline()
421 f.readinto(buf)
422 f.read()
423 f.readlines()
424 except ValueError:
425 self.fail("read* failed after next() consumed file")
426 finally:
427 f.close()
428 finally:
429 os.unlink(TESTFN)
430
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200431 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
432 def test_write_full(self):
433 # Issue #17976
Serhiy Storchaka84e7e5f2013-12-17 14:53:32 +0200434 try:
435 f = open('/dev/full', 'w', 1)
436 except IOError:
437 self.skipTest("requires '/dev/full'")
438 try:
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200439 with self.assertRaises(IOError):
440 f.write('hello')
441 f.write('\n')
Serhiy Storchaka84e7e5f2013-12-17 14:53:32 +0200442 finally:
443 f.close()
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200444
Benjamin Petersonbc4a8342014-09-30 21:28:27 -0400445 @unittest.skipUnless(sys.maxsize > 2**31, "requires 64-bit system")
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300446 @test_support.precisionbigmemtest(2**31, 2.5, dry_run=False)
447 def test_very_long_line(self, size):
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400448 # Issue #22526
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300449 requires('largefile')
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400450 with open(TESTFN, "wb") as fp:
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300451 fp.seek(size - 1)
452 fp.write("\0")
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400453 with open(TESTFN, "rb") as fp:
454 for l in fp:
455 pass
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300456 self.assertEqual(len(l), size)
457 self.assertEqual(l.count("\0"), size)
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400458 l = None
459
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000460class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000461
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000462 def testExit(self):
463 # test that exiting with context calls subclass' close
464 class C(file):
465 def __init__(self, *args):
466 self.subclass_closed = False
467 file.__init__(self, *args)
468 def close(self):
469 self.subclass_closed = True
470 file.close(self)
471
472 with C(TESTFN, 'w') as f:
473 pass
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000474 self.assertTrue(f.subclass_closed)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000475
476
Victor Stinner6a102812010-04-27 23:55:59 +0000477@unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000478class FileThreadingTests(unittest.TestCase):
479 # These tests check the ability to call various methods of file objects
480 # (including close()) concurrently without crashing the Python interpreter.
481 # See #815646, #595601
482
483 def setUp(self):
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000484 self._threads = test_support.threading_setup()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000485 self.f = None
486 self.filename = TESTFN
487 with open(self.filename, "w") as f:
488 f.write("\n".join("0123456789"))
489 self._count_lock = threading.Lock()
490 self.close_count = 0
491 self.close_success_count = 0
Antoine Pitrou83137c22010-05-17 19:56:59 +0000492 self.use_buffering = False
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000493
494 def tearDown(self):
495 if self.f:
496 try:
497 self.f.close()
498 except (EnvironmentError, ValueError):
499 pass
500 try:
501 os.remove(self.filename)
502 except EnvironmentError:
503 pass
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000504 test_support.threading_cleanup(*self._threads)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000505
506 def _create_file(self):
Antoine Pitrou83137c22010-05-17 19:56:59 +0000507 if self.use_buffering:
508 self.f = open(self.filename, "w+", buffering=1024*16)
509 else:
510 self.f = open(self.filename, "w+")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000511
512 def _close_file(self):
513 with self._count_lock:
514 self.close_count += 1
515 self.f.close()
516 with self._count_lock:
517 self.close_success_count += 1
518
519 def _close_and_reopen_file(self):
520 self._close_file()
521 # if close raises an exception thats fine, self.f remains valid so
522 # we don't need to reopen.
523 self._create_file()
524
525 def _run_workers(self, func, nb_workers, duration=0.2):
526 with self._count_lock:
527 self.close_count = 0
528 self.close_success_count = 0
529 self.do_continue = True
530 threads = []
531 try:
532 for i in range(nb_workers):
533 t = threading.Thread(target=func)
534 t.start()
535 threads.append(t)
536 for _ in xrange(100):
537 time.sleep(duration/100)
538 with self._count_lock:
539 if self.close_count-self.close_success_count > nb_workers+1:
540 if test_support.verbose:
541 print 'Q',
542 break
543 time.sleep(duration)
544 finally:
545 self.do_continue = False
546 for t in threads:
547 t.join()
548
549 def _test_close_open_io(self, io_func, nb_workers=5):
550 def worker():
551 self._create_file()
552 funcs = itertools.cycle((
553 lambda: io_func(),
554 lambda: self._close_and_reopen_file(),
555 ))
556 for f in funcs:
557 if not self.do_continue:
558 break
559 try:
560 f()
561 except (IOError, ValueError):
562 pass
563 self._run_workers(worker, nb_workers)
564 if test_support.verbose:
565 # Useful verbose statistics when tuning this test to take
566 # less time to run but still ensuring that its still useful.
567 #
568 # the percent of close calls that raised an error
569 percent = 100. - 100.*self.close_success_count/self.close_count
570 print self.close_count, ('%.4f ' % percent),
571
572 def test_close_open(self):
573 def io_func():
574 pass
575 self._test_close_open_io(io_func)
576
577 def test_close_open_flush(self):
578 def io_func():
579 self.f.flush()
580 self._test_close_open_io(io_func)
581
582 def test_close_open_iter(self):
583 def io_func():
584 list(iter(self.f))
585 self._test_close_open_io(io_func)
586
587 def test_close_open_isatty(self):
588 def io_func():
589 self.f.isatty()
590 self._test_close_open_io(io_func)
591
592 def test_close_open_print(self):
593 def io_func():
594 print >> self.f, ''
595 self._test_close_open_io(io_func)
596
Antoine Pitrou83137c22010-05-17 19:56:59 +0000597 def test_close_open_print_buffered(self):
598 self.use_buffering = True
599 def io_func():
600 print >> self.f, ''
601 self._test_close_open_io(io_func)
602
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000603 def test_close_open_read(self):
604 def io_func():
605 self.f.read(0)
606 self._test_close_open_io(io_func)
607
608 def test_close_open_readinto(self):
609 def io_func():
610 a = array('c', 'xxxxx')
611 self.f.readinto(a)
612 self._test_close_open_io(io_func)
613
614 def test_close_open_readline(self):
615 def io_func():
616 self.f.readline()
617 self._test_close_open_io(io_func)
618
619 def test_close_open_readlines(self):
620 def io_func():
621 self.f.readlines()
622 self._test_close_open_io(io_func)
623
624 def test_close_open_seek(self):
625 def io_func():
626 self.f.seek(0, 0)
627 self._test_close_open_io(io_func)
628
629 def test_close_open_tell(self):
630 def io_func():
631 self.f.tell()
632 self._test_close_open_io(io_func)
633
634 def test_close_open_truncate(self):
635 def io_func():
636 self.f.truncate()
637 self._test_close_open_io(io_func)
638
639 def test_close_open_write(self):
640 def io_func():
641 self.f.write('')
642 self._test_close_open_io(io_func)
643
644 def test_close_open_writelines(self):
645 def io_func():
646 self.f.writelines('')
647 self._test_close_open_io(io_func)
648
649
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700650@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
651class TestFileSignalEINTR(unittest.TestCase):
652 def _test_reading(self, data_to_write, read_and_verify_code, method_name,
653 universal_newlines=False):
654 """Generic buffered read method test harness to verify EINTR behavior.
655
656 Also validates that Python signal handlers are run during the read.
657
658 Args:
659 data_to_write: String to write to the child process for reading
660 before sending it a signal, confirming the signal was handled,
661 writing a final newline char and closing the infile pipe.
662 read_and_verify_code: Single "line" of code to read from a file
663 object named 'infile' and validate the result. This will be
664 executed as part of a python subprocess fed data_to_write.
665 method_name: The name of the read method being tested, for use in
666 an error message on failure.
667 universal_newlines: If True, infile will be opened in universal
668 newline mode in the child process.
669 """
670 if universal_newlines:
671 # Test the \r\n -> \n conversion while we're at it.
672 data_to_write = data_to_write.replace('\n', '\r\n')
673 infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
674 else:
675 infile_setup_code = 'infile = sys.stdin'
676 # Total pipe IO in this function is smaller than the minimum posix OS
677 # pipe buffer size of 512 bytes. No writer should block.
678 assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
679
680 child_code = (
681 'import os, signal, sys ;'
682 'signal.signal('
683 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
684 + infile_setup_code + ' ;' +
685 'assert isinstance(infile, file) ;'
686 'sys.stderr.write("Go.\\n") ;'
687 + read_and_verify_code)
688 reader_process = subprocess.Popen(
689 [sys.executable, '-c', child_code],
690 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
691 stderr=subprocess.PIPE)
692 # Wait for the signal handler to be installed.
693 go = reader_process.stderr.read(4)
694 if go != 'Go.\n':
695 reader_process.kill()
696 self.fail('Error from %s process while awaiting "Go":\n%s' % (
697 method_name, go+reader_process.stderr.read()))
698 reader_process.stdin.write(data_to_write)
699 signals_sent = 0
700 rlist = []
701 # We don't know when the read_and_verify_code in our child is actually
702 # executing within the read system call we want to interrupt. This
703 # loop waits for a bit before sending the first signal to increase
704 # the likelihood of that. Implementations without correct EINTR
705 # and signal handling usually fail this test.
706 while not rlist:
707 rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
708 reader_process.send_signal(signal.SIGINT)
709 # Give the subprocess time to handle it before we loop around and
710 # send another one. On OSX the second signal happening close to
711 # immediately after the first was causing the subprocess to crash
712 # via the OS's default SIGINT handler.
713 time.sleep(0.1)
714 signals_sent += 1
715 if signals_sent > 200:
716 reader_process.kill()
717 self.fail("failed to handle signal during %s." % method_name)
718 # This assumes anything unexpected that writes to stderr will also
719 # write a newline. That is true of the traceback printing code.
720 signal_line = reader_process.stderr.readline()
721 if signal_line != '$\n':
722 reader_process.kill()
723 self.fail('Error from %s process while awaiting signal:\n%s' % (
724 method_name, signal_line+reader_process.stderr.read()))
725 # We append a newline to our input so that a readline call can
726 # end on its own before the EOF is seen.
727 stdout, stderr = reader_process.communicate(input='\n')
728 if reader_process.returncode != 0:
729 self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
730 method_name, reader_process.returncode, stdout, stderr))
731
732 def test_readline(self, universal_newlines=False):
733 """file.readline must handle signals and not lose data."""
734 self._test_reading(
735 data_to_write='hello, world!',
736 read_and_verify_code=(
737 'line = infile.readline() ;'
738 'expected_line = "hello, world!\\n" ;'
739 'assert line == expected_line, ('
740 '"read %r expected %r" % (line, expected_line))'
741 ),
742 method_name='readline',
743 universal_newlines=universal_newlines)
744
745 def test_readline_with_universal_newlines(self):
746 self.test_readline(universal_newlines=True)
747
748 def test_readlines(self, universal_newlines=False):
749 """file.readlines must handle signals and not lose data."""
750 self._test_reading(
751 data_to_write='hello\nworld!',
752 read_and_verify_code=(
753 'lines = infile.readlines() ;'
754 'expected_lines = ["hello\\n", "world!\\n"] ;'
755 'assert lines == expected_lines, ('
756 '"readlines returned wrong data.\\n" '
757 '"got lines %r\\nexpected %r" '
758 '% (lines, expected_lines))'
759 ),
760 method_name='readlines',
761 universal_newlines=universal_newlines)
762
763 def test_readlines_with_universal_newlines(self):
764 self.test_readlines(universal_newlines=True)
765
766 def test_readall(self):
767 """Unbounded file.read() must handle signals and not lose data."""
768 self._test_reading(
769 data_to_write='hello, world!abcdefghijklm',
770 read_and_verify_code=(
771 'data = infile.read() ;'
772 'expected_data = "hello, world!abcdefghijklm\\n";'
773 'assert data == expected_data, ('
774 '"read %r expected %r" % (data, expected_data))'
775 ),
776 method_name='unbounded read')
777
778 def test_readinto(self):
779 """file.readinto must handle signals and not lose data."""
780 self._test_reading(
781 data_to_write='hello, world!',
782 read_and_verify_code=(
783 'data = bytearray(50) ;'
784 'num_read = infile.readinto(data) ;'
785 'expected_data = "hello, world!\\n";'
786 'assert data[:num_read] == expected_data, ('
787 '"read %r expected %r" % (data, expected_data))'
788 ),
789 method_name='readinto')
790
791
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000792class StdoutTests(unittest.TestCase):
793
794 def test_move_stdout_on_write(self):
795 # Issue 3242: sys.stdout can be replaced (and freed) during a
796 # print statement; prevent a segfault in this case
797 save_stdout = sys.stdout
798
799 class File:
800 def write(self, data):
801 if '\n' in data:
802 sys.stdout = save_stdout
803
804 try:
805 sys.stdout = File()
806 print "some text"
807 finally:
808 sys.stdout = save_stdout
809
810 def test_del_stdout_before_print(self):
811 # Issue 4597: 'print' with no argument wasn't reporting when
812 # sys.stdout was deleted.
813 save_stdout = sys.stdout
814 del sys.stdout
815 try:
816 print
817 except RuntimeError as e:
Ezio Melotti2623a372010-11-21 13:34:58 +0000818 self.assertEqual(str(e), "lost sys.stdout")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000819 else:
820 self.fail("Expected RuntimeError")
821 finally:
822 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000823
Victor Stinnercaafd772010-09-08 10:51:01 +0000824 def test_unicode(self):
825 import subprocess
826
827 def get_message(encoding, *code):
828 code = '\n'.join(code)
829 env = os.environ.copy()
830 env['PYTHONIOENCODING'] = encoding
831 process = subprocess.Popen([sys.executable, "-c", code],
832 stdout=subprocess.PIPE, env=env)
833 stdout, stderr = process.communicate()
834 self.assertEqual(process.returncode, 0)
835 return stdout
836
837 def check_message(text, encoding, expected):
838 stdout = get_message(encoding,
839 "import sys",
840 "sys.stdout.write(%r)" % text,
841 "sys.stdout.flush()")
842 self.assertEqual(stdout, expected)
843
Victor Stinner3a68f912010-09-08 11:45:16 +0000844 # test the encoding
845 check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
846 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
847 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
Victor Stinnercaafd772010-09-08 10:51:01 +0000848
Victor Stinner3a68f912010-09-08 11:45:16 +0000849 # test the error handler
850 check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
851 check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
852 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
853
854 # test the buffer API
Victor Stinnercaafd772010-09-08 10:51:01 +0000855 for objtype in ('buffer', 'bytearray'):
856 stdout = get_message('ascii',
857 'import sys',
Victor Stinner3a68f912010-09-08 11:45:16 +0000858 r'sys.stdout.write(%s("\xe9"))' % objtype,
859 'sys.stdout.flush()')
860 self.assertEqual(stdout, "\xe9")
Victor Stinnercaafd772010-09-08 10:51:01 +0000861
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000862
863def test_main():
864 # Historically, these tests have been sloppy about removing TESTFN.
865 # So get rid of it no matter what.
866 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000867 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700868 FileThreadingTests, TestFileSignalEINTR, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000869 finally:
870 if os.path.exists(TESTFN):
871 os.unlink(TESTFN)
872
873if __name__ == '__main__':
874 test_main()