blob: c73e8d8dc450e4202b87f749ba5301e9b9abcc11 [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -07005import select
6import signal
Serhiy Storchaka66306cf2015-02-15 13:05:10 +02007import stat
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -07008import subprocess
Antoine Pitrou47a5f482009-06-12 20:41:52 +00009import time
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000010from array import array
11from weakref import proxy
Victor Stinner6a102812010-04-27 23:55:59 +000012try:
13 import threading
14except ImportError:
15 threading = None
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000016
Antoine Pitrou47a5f482009-06-12 20:41:52 +000017from test import test_support
Serhiy Storchaka80f0c822014-10-12 17:13:06 +030018from test.test_support import TESTFN, run_unittest, requires
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000019from UserList import UserList
20
21class AutoFileTests(unittest.TestCase):
22 # file tests for which a test file is automatically set up
23
24 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000025 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000026
27 def tearDown(self):
28 if self.f:
29 self.f.close()
30 os.remove(TESTFN)
31
32 def testWeakRefs(self):
33 # verify weak references
34 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000035 p.write('teststring')
Ezio Melotti2623a372010-11-21 13:34:58 +000036 self.assertEqual(self.f.tell(), p.tell())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000037 self.f.close()
38 self.f = None
39 self.assertRaises(ReferenceError, getattr, p, 'tell')
40
41 def testAttributes(self):
42 # verify expected attributes exist
43 f = self.f
Ezio Melottid80b4bf2010-03-17 13:52:48 +000044 with test_support.check_py3k_warnings():
45 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000046 f.name # merely shouldn't blow up
47 f.mode # ditto
48 f.closed # ditto
49
Ezio Melottid80b4bf2010-03-17 13:52:48 +000050 with test_support.check_py3k_warnings():
51 # verify softspace is writable
52 f.softspace = softspace # merely shouldn't blow up
Antoine Pitrou47a5f482009-06-12 20:41:52 +000053
54 # verify the others aren't
55 for attr in 'name', 'mode', 'closed':
56 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
57
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000058 def testReadinto(self):
59 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000060 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000061 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000062 a = array('c', 'x'*10)
63 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000064 n = self.f.readinto(a)
Ezio Melotti2623a372010-11-21 13:34:58 +000065 self.assertEqual('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000066
67 def testWritelinesUserList(self):
68 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000069 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000070 self.f.writelines(l)
71 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000072 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000073 buf = self.f.read()
Ezio Melotti2623a372010-11-21 13:34:58 +000074 self.assertEqual(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000075
76 def testWritelinesIntegers(self):
77 # verify writelines with integers
78 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
79
80 def testWritelinesIntegersUserList(self):
81 # verify writelines with integers in UserList
82 l = UserList([1,2,3])
83 self.assertRaises(TypeError, self.f.writelines, l)
84
85 def testWritelinesNonString(self):
86 # verify writelines with non-string object
87 class NonString:
88 pass
89
90 self.assertRaises(TypeError, self.f.writelines,
91 [NonString(), NonString()])
92
Antoine Pitroub0acc1b2014-05-08 19:26:04 +020093 def testWritelinesBuffer(self):
94 self.f.writelines([array('c', 'abc')])
95 self.f.close()
96 self.f = open(TESTFN, 'rb')
97 buf = self.f.read()
98 self.assertEqual(buf, 'abc')
99
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000100 def testRepr(self):
101 # verify repr works
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000102 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
Ezio Melotti11f8b682012-03-12 01:17:02 +0200103 # see issue #14161
104 # Windows doesn't like \r\n\t" in the file name, but ' is ok
105 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
106 with open(fname, 'w') as f:
107 self.addCleanup(os.remove, fname)
108 self.assertTrue(repr(f).startswith(
109 "<open file %r, mode 'w' at" % fname))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000110
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000111 def testErrors(self):
Antoine Pitroubb445a12010-02-05 17:05:54 +0000112 self.f.close()
113 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000114 f = self.f
Ezio Melotti2623a372010-11-21 13:34:58 +0000115 self.assertEqual(f.name, TESTFN)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000116 self.assertTrue(not f.isatty())
117 self.assertTrue(not f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000118
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000119 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000120 f.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000121 self.assertTrue(f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000122
123 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000124 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
125 'readline', 'readlines', 'seek', 'tell', 'truncate',
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000126 'write', '__iter__']
127 deprecated_methods = ['xreadlines']
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000128 if sys.platform.startswith('atheos'):
129 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000130
131 # __exit__ should close the file
132 self.f.__exit__(None, None, None)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000133 self.assertTrue(self.f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000134
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000135 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000136 method = getattr(self.f, methodname)
137 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000138 self.assertRaises(ValueError, method)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000139 with test_support.check_py3k_warnings():
140 for methodname in deprecated_methods:
141 method = getattr(self.f, methodname)
142 self.assertRaises(ValueError, method)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000143 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000144
145 # file is closed, __exit__ shouldn't do anything
Ezio Melotti2623a372010-11-21 13:34:58 +0000146 self.assertEqual(self.f.__exit__(None, None, None), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000147 # it must also return None if an exception was given
148 try:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000149 1 // 0
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000150 except:
Ezio Melotti2623a372010-11-21 13:34:58 +0000151 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000152
153 def testReadWhenWriting(self):
154 self.assertRaises(IOError, self.f.read)
155
Benjamin Petersonbf775542010-10-16 19:20:12 +0000156 def testNastyWritelinesGenerator(self):
157 def nasty():
158 for i in range(5):
159 if i == 3:
160 self.f.close()
161 yield str(i)
162 self.assertRaises(ValueError, self.f.writelines, nasty())
163
Antoine Pitroubb445a12010-02-05 17:05:54 +0000164 def testIssue5677(self):
165 # Remark: Do not perform more than one test per open file,
166 # since that does NOT catch the readline error on Windows.
167 data = 'xxx'
168 for mode in ['w', 'wb', 'a', 'ab']:
169 for attr in ['read', 'readline', 'readlines']:
170 self.f = open(TESTFN, mode)
171 self.f.write(data)
172 self.assertRaises(IOError, getattr(self.f, attr))
173 self.f.close()
174
175 self.f = open(TESTFN, mode)
176 self.f.write(data)
177 self.assertRaises(IOError, lambda: [line for line in self.f])
178 self.f.close()
179
180 self.f = open(TESTFN, mode)
181 self.f.write(data)
182 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
183 self.f.close()
184
185 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
186 self.f = open(TESTFN, mode)
187 self.assertRaises(IOError, self.f.write, data)
188 self.f.close()
189
190 self.f = open(TESTFN, mode)
191 self.assertRaises(IOError, self.f.writelines, [data, data])
192 self.f.close()
193
194 self.f = open(TESTFN, mode)
195 self.assertRaises(IOError, self.f.truncate)
196 self.f.close()
197
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000198class OtherFileTests(unittest.TestCase):
199
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000200 def testOpenDir(self):
Hirokazu Yamamotofa647ec2010-09-23 15:59:21 +0000201 this_dir = os.path.dirname(__file__) or os.curdir
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000202 for mode in (None, "w"):
203 try:
204 if mode:
205 f = open(this_dir, mode)
206 else:
207 f = open(this_dir)
208 except IOError as e:
209 self.assertEqual(e.filename, this_dir)
210 else:
211 self.fail("opening a directory didn't raise an IOError")
212
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000213 def testModeStrings(self):
214 # check invalid mode strings
215 for mode in ("", "aU", "wU+"):
216 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000217 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000218 except ValueError:
219 pass
220 else:
221 f.close()
222 self.fail('%r is an invalid file mode' % mode)
223
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000224 # Some invalid modes fail on Windows, but pass on Unix
225 # Issue3965: avoid a crash on Windows when filename is unicode
226 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
227 try:
228 f = open(name, "rr")
229 except (IOError, ValueError):
230 pass
231 else:
232 f.close()
233
Victor Stinner7ba8cdc2015-01-06 12:39:45 +0100234 def testStdinSeek(self):
235 if sys.platform == 'osf1V5':
236 # This causes the interpreter to exit on OSF1 v5.1.
237 self.skipTest('Skipping sys.stdin.seek(-1), it may crash '
238 'the interpreter. Test manually.')
239
240 if not sys.stdin.isatty():
241 # Issue #23168: if stdin is redirected to a file, stdin becomes
242 # seekable
243 self.skipTest('stdin must be a TTY in this test')
244
245 self.assertRaises(IOError, sys.stdin.seek, -1)
246
247 def testStdinTruncate(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000248 self.assertRaises(IOError, sys.stdin.truncate)
249
250 def testUnicodeOpen(self):
251 # verify repr works for unicode too
252 f = open(unicode(TESTFN), "w")
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000253 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000254 f.close()
255 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000256
257 def testBadModeArgument(self):
258 # verify that we get a sensible error message for bad mode argument
259 bad_mode = "qwerty"
260 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000261 f = open(TESTFN, bad_mode)
262 except ValueError, msg:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000263 if msg.args[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000264 s = str(msg)
Ezio Melotti187f93d2010-03-17 14:22:34 +0000265 if TESTFN in s or bad_mode not in s:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000266 self.fail("bad error message for invalid mode: %s" % s)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000267 # if msg.args[0] == 0, we're probably on Windows where there may
268 # be no obvious way to discover why open() failed.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000269 else:
270 f.close()
271 self.fail("no error for invalid mode: %s" % bad_mode)
272
273 def testSetBufferSize(self):
274 # make sure that explicitly setting the buffer size doesn't cause
275 # misbehaviour especially with repeated close() calls
276 for s in (-1, 0, 1, 512):
277 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000278 f = open(TESTFN, 'w', s)
279 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000280 f.close()
281 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000282 f = open(TESTFN, 'r', s)
283 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000284 f.close()
285 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000286 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000287 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
Ezio Melotti2623a372010-11-21 13:34:58 +0000288 self.assertEqual(d, s)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000289
290 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000291 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000292
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000293 def bug801631():
294 # SF bug <http://www.python.org/sf/801631>
295 # "file.truncate fault on windows"
296 f = open(TESTFN, 'wb')
297 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000298 f.close()
299
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000300 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000301 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000302 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000303 self.fail("Read on file opened for update failed %r" % data)
304 if f.tell() != 5:
305 self.fail("File pos after read wrong %d" % f.tell())
306
307 f.truncate()
308 if f.tell() != 5:
309 self.fail("File pos after ftruncate wrong %d" % f.tell())
310
311 f.close()
312 size = os.path.getsize(TESTFN)
313 if size != 5:
314 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000315
316 try:
317 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000318 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000319 os.unlink(TESTFN)
320
321 def testIteration(self):
322 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000323 # various read* methods. Ostensibly, the mixture could just be tested
324 # to work when it should work according to the Python language,
325 # instead of fail when it should fail according to the current CPython
326 # implementation. People don't always program Python the way they
327 # should, though, and the implemenation might change in subtle ways,
328 # so we explicitly test for errors, too; the test will just have to
329 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000330 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000331 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000332 assert not dataoffset % len(filler), \
333 "dataoffset must be multiple of len(filler)"
334 nchunks = dataoffset // len(filler)
335 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000336 "spam, spam and eggs\n",
337 "eggs, spam, ham and spam\n",
338 "saussages, spam, spam and eggs\n",
339 "spam, ham, spam and eggs\n",
340 "spam, spam, spam, spam, spam, ham, spam\n",
341 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000342 ]
343 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000344 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000345
346 try:
347 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000348 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000349 bag.write(filler * nchunks)
350 bag.writelines(testlines)
351 bag.close()
352 # Test for appropriate errors mixing read* and iteration
353 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000354 f = open(TESTFN)
355 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000356 self.fail, "Broken testfile"
357 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000358 try:
359 meth(*args)
360 except ValueError:
361 pass
362 else:
363 self.fail("%s%r after next() didn't raise ValueError" %
364 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000365 f.close()
366
367 # Test to see if harmless (by accident) mixing of read* and
368 # iteration still works. This depends on the size of the internal
369 # iteration buffer (currently 8192,) but we can test it in a
370 # flexible manner. Each line in the bag o' ham is 4 bytes
371 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
372 # exactly on the buffer boundary for any power-of-2 buffersize
373 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000374 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000375 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000376 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000377 testline = testlines.pop(0)
378 try:
379 line = f.readline()
380 except ValueError:
381 self.fail("readline() after next() with supposedly empty "
382 "iteration-buffer failed anyway")
383 if line != testline:
384 self.fail("readline() after next() with empty buffer "
385 "failed. Got %r, expected %r" % (line, testline))
386 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000387 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000388 try:
389 f.readinto(buf)
390 except ValueError:
391 self.fail("readinto() after next() with supposedly empty "
392 "iteration-buffer failed anyway")
393 line = buf.tostring()
394 if line != testline:
395 self.fail("readinto() after next() with empty buffer "
396 "failed. Got %r, expected %r" % (line, testline))
397
398 testline = testlines.pop(0)
399 try:
400 line = f.read(len(testline))
401 except ValueError:
402 self.fail("read() after next() with supposedly empty "
403 "iteration-buffer failed anyway")
404 if line != testline:
405 self.fail("read() after next() with empty buffer "
406 "failed. Got %r, expected %r" % (line, testline))
407 try:
408 lines = f.readlines()
409 except ValueError:
410 self.fail("readlines() after next() with supposedly empty "
411 "iteration-buffer failed anyway")
412 if lines != testlines:
413 self.fail("readlines() after next() with empty buffer "
414 "failed. Got %r, expected %r" % (line, testline))
415 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000416 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000417 try:
418 for line in f:
419 pass
420 try:
421 f.readline()
422 f.readinto(buf)
423 f.read()
424 f.readlines()
425 except ValueError:
426 self.fail("read* failed after next() consumed file")
427 finally:
428 f.close()
429 finally:
430 os.unlink(TESTFN)
431
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200432 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
433 def test_write_full(self):
Serhiy Storchaka66306cf2015-02-15 13:05:10 +0200434 devfull = '/dev/full'
435 if not (os.path.exists(devfull) and
436 stat.S_ISCHR(os.stat(devfull).st_mode)):
437 # Issue #21934: OpenBSD does not have a /dev/full character device
438 self.skipTest('requires %r' % devfull)
439 with open(devfull, 'wb', 1) as f:
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200440 with self.assertRaises(IOError):
Serhiy Storchaka66306cf2015-02-15 13:05:10 +0200441 f.write('hello\n')
442 with open(devfull, 'wb', 1) as f:
443 with self.assertRaises(IOError):
444 # Issue #17976
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200445 f.write('hello')
446 f.write('\n')
Serhiy Storchaka66306cf2015-02-15 13:05:10 +0200447 with open(devfull, 'wb', 0) as f:
448 with self.assertRaises(IOError):
449 f.write('h')
Serhiy Storchaka6d562312013-12-17 14:40:06 +0200450
Benjamin Petersonbc4a8342014-09-30 21:28:27 -0400451 @unittest.skipUnless(sys.maxsize > 2**31, "requires 64-bit system")
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300452 @test_support.precisionbigmemtest(2**31, 2.5, dry_run=False)
453 def test_very_long_line(self, size):
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400454 # Issue #22526
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300455 requires('largefile')
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400456 with open(TESTFN, "wb") as fp:
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300457 fp.seek(size - 1)
458 fp.write("\0")
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400459 with open(TESTFN, "rb") as fp:
460 for l in fp:
461 pass
Serhiy Storchaka80f0c822014-10-12 17:13:06 +0300462 self.assertEqual(len(l), size)
463 self.assertEqual(l.count("\0"), size)
Benjamin Peterson95bc0e42014-09-30 21:17:15 -0400464 l = None
465
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000466class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000467
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000468 def testExit(self):
469 # test that exiting with context calls subclass' close
470 class C(file):
471 def __init__(self, *args):
472 self.subclass_closed = False
473 file.__init__(self, *args)
474 def close(self):
475 self.subclass_closed = True
476 file.close(self)
477
478 with C(TESTFN, 'w') as f:
479 pass
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000480 self.assertTrue(f.subclass_closed)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000481
482
Victor Stinner6a102812010-04-27 23:55:59 +0000483@unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000484class FileThreadingTests(unittest.TestCase):
485 # These tests check the ability to call various methods of file objects
486 # (including close()) concurrently without crashing the Python interpreter.
487 # See #815646, #595601
488
489 def setUp(self):
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000490 self._threads = test_support.threading_setup()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000491 self.f = None
492 self.filename = TESTFN
493 with open(self.filename, "w") as f:
494 f.write("\n".join("0123456789"))
495 self._count_lock = threading.Lock()
496 self.close_count = 0
497 self.close_success_count = 0
Antoine Pitrou83137c22010-05-17 19:56:59 +0000498 self.use_buffering = False
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000499
500 def tearDown(self):
501 if self.f:
502 try:
503 self.f.close()
504 except (EnvironmentError, ValueError):
505 pass
506 try:
507 os.remove(self.filename)
508 except EnvironmentError:
509 pass
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000510 test_support.threading_cleanup(*self._threads)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000511
512 def _create_file(self):
Antoine Pitrou83137c22010-05-17 19:56:59 +0000513 if self.use_buffering:
514 self.f = open(self.filename, "w+", buffering=1024*16)
515 else:
516 self.f = open(self.filename, "w+")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000517
518 def _close_file(self):
519 with self._count_lock:
520 self.close_count += 1
521 self.f.close()
522 with self._count_lock:
523 self.close_success_count += 1
524
525 def _close_and_reopen_file(self):
526 self._close_file()
527 # if close raises an exception thats fine, self.f remains valid so
528 # we don't need to reopen.
529 self._create_file()
530
531 def _run_workers(self, func, nb_workers, duration=0.2):
532 with self._count_lock:
533 self.close_count = 0
534 self.close_success_count = 0
535 self.do_continue = True
536 threads = []
537 try:
538 for i in range(nb_workers):
539 t = threading.Thread(target=func)
540 t.start()
541 threads.append(t)
542 for _ in xrange(100):
543 time.sleep(duration/100)
544 with self._count_lock:
545 if self.close_count-self.close_success_count > nb_workers+1:
546 if test_support.verbose:
547 print 'Q',
548 break
549 time.sleep(duration)
550 finally:
551 self.do_continue = False
552 for t in threads:
553 t.join()
554
555 def _test_close_open_io(self, io_func, nb_workers=5):
556 def worker():
557 self._create_file()
558 funcs = itertools.cycle((
559 lambda: io_func(),
560 lambda: self._close_and_reopen_file(),
561 ))
562 for f in funcs:
563 if not self.do_continue:
564 break
565 try:
566 f()
567 except (IOError, ValueError):
568 pass
569 self._run_workers(worker, nb_workers)
570 if test_support.verbose:
571 # Useful verbose statistics when tuning this test to take
572 # less time to run but still ensuring that its still useful.
573 #
574 # the percent of close calls that raised an error
575 percent = 100. - 100.*self.close_success_count/self.close_count
576 print self.close_count, ('%.4f ' % percent),
577
578 def test_close_open(self):
579 def io_func():
580 pass
581 self._test_close_open_io(io_func)
582
583 def test_close_open_flush(self):
584 def io_func():
585 self.f.flush()
586 self._test_close_open_io(io_func)
587
588 def test_close_open_iter(self):
589 def io_func():
590 list(iter(self.f))
591 self._test_close_open_io(io_func)
592
593 def test_close_open_isatty(self):
594 def io_func():
595 self.f.isatty()
596 self._test_close_open_io(io_func)
597
598 def test_close_open_print(self):
599 def io_func():
600 print >> self.f, ''
601 self._test_close_open_io(io_func)
602
Antoine Pitrou83137c22010-05-17 19:56:59 +0000603 def test_close_open_print_buffered(self):
604 self.use_buffering = True
605 def io_func():
606 print >> self.f, ''
607 self._test_close_open_io(io_func)
608
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000609 def test_close_open_read(self):
610 def io_func():
611 self.f.read(0)
612 self._test_close_open_io(io_func)
613
614 def test_close_open_readinto(self):
615 def io_func():
616 a = array('c', 'xxxxx')
617 self.f.readinto(a)
618 self._test_close_open_io(io_func)
619
620 def test_close_open_readline(self):
621 def io_func():
622 self.f.readline()
623 self._test_close_open_io(io_func)
624
625 def test_close_open_readlines(self):
626 def io_func():
627 self.f.readlines()
628 self._test_close_open_io(io_func)
629
630 def test_close_open_seek(self):
631 def io_func():
632 self.f.seek(0, 0)
633 self._test_close_open_io(io_func)
634
635 def test_close_open_tell(self):
636 def io_func():
637 self.f.tell()
638 self._test_close_open_io(io_func)
639
640 def test_close_open_truncate(self):
641 def io_func():
642 self.f.truncate()
643 self._test_close_open_io(io_func)
644
645 def test_close_open_write(self):
646 def io_func():
647 self.f.write('')
648 self._test_close_open_io(io_func)
649
650 def test_close_open_writelines(self):
651 def io_func():
652 self.f.writelines('')
653 self._test_close_open_io(io_func)
654
Serhiy Storchaka6401e562017-11-10 12:58:55 +0200655 def test_iteration_torture(self):
Benjamin Petersondbf52e02018-01-02 09:25:41 -0800656 # bpo-31530
Serhiy Storchaka6401e562017-11-10 12:58:55 +0200657 with open(self.filename, "wb") as fp:
658 for i in xrange(2**20):
659 fp.write(b"0"*50 + b"\n")
660 with open(self.filename, "rb") as f:
Benjamin Petersondbf52e02018-01-02 09:25:41 -0800661 def it():
662 for l in f:
Serhiy Storchaka6401e562017-11-10 12:58:55 +0200663 pass
Benjamin Petersondbf52e02018-01-02 09:25:41 -0800664 self._run_workers(it, 10)
Serhiy Storchaka6401e562017-11-10 12:58:55 +0200665
666 def test_iteration_seek(self):
667 # bpo-31530: Crash when concurrently seek and iterate over a file.
668 with open(self.filename, "wb") as fp:
669 for i in xrange(10000):
670 fp.write(b"0"*50 + b"\n")
671 with open(self.filename, "rb") as f:
672 it = iter([1] + [0]*10) # one thread reads, others seek
673 def iterate():
Benjamin Petersondbf52e02018-01-02 09:25:41 -0800674 if next(it):
675 for l in f:
676 pass
677 else:
678 for i in xrange(100):
679 f.seek(i*100, 0)
Serhiy Storchaka6401e562017-11-10 12:58:55 +0200680 self._run_workers(iterate, 10)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000681
Benjamin Petersondbf52e02018-01-02 09:25:41 -0800682
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700683@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
684class TestFileSignalEINTR(unittest.TestCase):
685 def _test_reading(self, data_to_write, read_and_verify_code, method_name,
686 universal_newlines=False):
687 """Generic buffered read method test harness to verify EINTR behavior.
688
689 Also validates that Python signal handlers are run during the read.
690
691 Args:
692 data_to_write: String to write to the child process for reading
693 before sending it a signal, confirming the signal was handled,
694 writing a final newline char and closing the infile pipe.
695 read_and_verify_code: Single "line" of code to read from a file
696 object named 'infile' and validate the result. This will be
697 executed as part of a python subprocess fed data_to_write.
698 method_name: The name of the read method being tested, for use in
699 an error message on failure.
700 universal_newlines: If True, infile will be opened in universal
701 newline mode in the child process.
702 """
703 if universal_newlines:
704 # Test the \r\n -> \n conversion while we're at it.
705 data_to_write = data_to_write.replace('\n', '\r\n')
706 infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
707 else:
708 infile_setup_code = 'infile = sys.stdin'
709 # Total pipe IO in this function is smaller than the minimum posix OS
710 # pipe buffer size of 512 bytes. No writer should block.
711 assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
712
713 child_code = (
714 'import os, signal, sys ;'
715 'signal.signal('
716 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
717 + infile_setup_code + ' ;' +
718 'assert isinstance(infile, file) ;'
719 'sys.stderr.write("Go.\\n") ;'
720 + read_and_verify_code)
721 reader_process = subprocess.Popen(
722 [sys.executable, '-c', child_code],
723 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
724 stderr=subprocess.PIPE)
725 # Wait for the signal handler to be installed.
726 go = reader_process.stderr.read(4)
727 if go != 'Go.\n':
728 reader_process.kill()
729 self.fail('Error from %s process while awaiting "Go":\n%s' % (
730 method_name, go+reader_process.stderr.read()))
731 reader_process.stdin.write(data_to_write)
732 signals_sent = 0
733 rlist = []
734 # We don't know when the read_and_verify_code in our child is actually
735 # executing within the read system call we want to interrupt. This
736 # loop waits for a bit before sending the first signal to increase
737 # the likelihood of that. Implementations without correct EINTR
738 # and signal handling usually fail this test.
739 while not rlist:
740 rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
741 reader_process.send_signal(signal.SIGINT)
742 # Give the subprocess time to handle it before we loop around and
743 # send another one. On OSX the second signal happening close to
744 # immediately after the first was causing the subprocess to crash
745 # via the OS's default SIGINT handler.
746 time.sleep(0.1)
747 signals_sent += 1
748 if signals_sent > 200:
749 reader_process.kill()
750 self.fail("failed to handle signal during %s." % method_name)
751 # This assumes anything unexpected that writes to stderr will also
752 # write a newline. That is true of the traceback printing code.
753 signal_line = reader_process.stderr.readline()
754 if signal_line != '$\n':
755 reader_process.kill()
756 self.fail('Error from %s process while awaiting signal:\n%s' % (
757 method_name, signal_line+reader_process.stderr.read()))
758 # We append a newline to our input so that a readline call can
759 # end on its own before the EOF is seen.
760 stdout, stderr = reader_process.communicate(input='\n')
761 if reader_process.returncode != 0:
762 self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
763 method_name, reader_process.returncode, stdout, stderr))
764
765 def test_readline(self, universal_newlines=False):
766 """file.readline must handle signals and not lose data."""
767 self._test_reading(
768 data_to_write='hello, world!',
769 read_and_verify_code=(
770 'line = infile.readline() ;'
771 'expected_line = "hello, world!\\n" ;'
772 'assert line == expected_line, ('
773 '"read %r expected %r" % (line, expected_line))'
774 ),
775 method_name='readline',
776 universal_newlines=universal_newlines)
777
778 def test_readline_with_universal_newlines(self):
779 self.test_readline(universal_newlines=True)
780
781 def test_readlines(self, universal_newlines=False):
782 """file.readlines must handle signals and not lose data."""
783 self._test_reading(
784 data_to_write='hello\nworld!',
785 read_and_verify_code=(
786 'lines = infile.readlines() ;'
787 'expected_lines = ["hello\\n", "world!\\n"] ;'
788 'assert lines == expected_lines, ('
789 '"readlines returned wrong data.\\n" '
790 '"got lines %r\\nexpected %r" '
791 '% (lines, expected_lines))'
792 ),
793 method_name='readlines',
794 universal_newlines=universal_newlines)
795
796 def test_readlines_with_universal_newlines(self):
797 self.test_readlines(universal_newlines=True)
798
799 def test_readall(self):
800 """Unbounded file.read() must handle signals and not lose data."""
801 self._test_reading(
802 data_to_write='hello, world!abcdefghijklm',
803 read_and_verify_code=(
804 'data = infile.read() ;'
805 'expected_data = "hello, world!abcdefghijklm\\n";'
806 'assert data == expected_data, ('
807 '"read %r expected %r" % (data, expected_data))'
808 ),
809 method_name='unbounded read')
810
811 def test_readinto(self):
812 """file.readinto must handle signals and not lose data."""
813 self._test_reading(
814 data_to_write='hello, world!',
815 read_and_verify_code=(
816 'data = bytearray(50) ;'
817 'num_read = infile.readinto(data) ;'
818 'expected_data = "hello, world!\\n";'
819 'assert data[:num_read] == expected_data, ('
820 '"read %r expected %r" % (data, expected_data))'
821 ),
822 method_name='readinto')
823
824
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000825class StdoutTests(unittest.TestCase):
826
827 def test_move_stdout_on_write(self):
828 # Issue 3242: sys.stdout can be replaced (and freed) during a
829 # print statement; prevent a segfault in this case
830 save_stdout = sys.stdout
831
832 class File:
833 def write(self, data):
834 if '\n' in data:
835 sys.stdout = save_stdout
836
837 try:
838 sys.stdout = File()
839 print "some text"
840 finally:
841 sys.stdout = save_stdout
842
843 def test_del_stdout_before_print(self):
844 # Issue 4597: 'print' with no argument wasn't reporting when
845 # sys.stdout was deleted.
846 save_stdout = sys.stdout
847 del sys.stdout
848 try:
849 print
850 except RuntimeError as e:
Ezio Melotti2623a372010-11-21 13:34:58 +0000851 self.assertEqual(str(e), "lost sys.stdout")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000852 else:
853 self.fail("Expected RuntimeError")
854 finally:
855 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000856
Victor Stinnercaafd772010-09-08 10:51:01 +0000857 def test_unicode(self):
858 import subprocess
859
860 def get_message(encoding, *code):
861 code = '\n'.join(code)
862 env = os.environ.copy()
863 env['PYTHONIOENCODING'] = encoding
864 process = subprocess.Popen([sys.executable, "-c", code],
865 stdout=subprocess.PIPE, env=env)
866 stdout, stderr = process.communicate()
867 self.assertEqual(process.returncode, 0)
868 return stdout
869
870 def check_message(text, encoding, expected):
871 stdout = get_message(encoding,
872 "import sys",
873 "sys.stdout.write(%r)" % text,
874 "sys.stdout.flush()")
875 self.assertEqual(stdout, expected)
876
Victor Stinner3a68f912010-09-08 11:45:16 +0000877 # test the encoding
878 check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
879 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
880 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
Victor Stinnercaafd772010-09-08 10:51:01 +0000881
Victor Stinner3a68f912010-09-08 11:45:16 +0000882 # test the error handler
883 check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
884 check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
885 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
886
887 # test the buffer API
Victor Stinnercaafd772010-09-08 10:51:01 +0000888 for objtype in ('buffer', 'bytearray'):
889 stdout = get_message('ascii',
890 'import sys',
Victor Stinner3a68f912010-09-08 11:45:16 +0000891 r'sys.stdout.write(%s("\xe9"))' % objtype,
892 'sys.stdout.flush()')
893 self.assertEqual(stdout, "\xe9")
Victor Stinnercaafd772010-09-08 10:51:01 +0000894
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000895
896def test_main():
897 # Historically, these tests have been sloppy about removing TESTFN.
898 # So get rid of it no matter what.
899 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000900 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Gregory P. Smithb2ac4d62012-06-25 20:57:36 -0700901 FileThreadingTests, TestFileSignalEINTR, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000902 finally:
903 if os.path.exists(TESTFN):
904 os.unlink(TESTFN)
905
906if __name__ == '__main__':
907 test_main()