blob: 96f6da202355a0b6c4c20b47b2905ebd55d69a1f [file] [log] [blame]
Martin v. Löwisf90ae202002-06-11 06:22:31 +00001import sys
Fred Drake2ec80fa2000-10-23 16:59:35 +00002import os
Georg Brandl442b49e2006-06-08 14:50:53 +00003import unittest
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +00004import itertools
5import time
6import threading
Neal Norwitz62f5a9d2002-04-01 00:09:00 +00007from array import array
Raymond Hettingercb87bc82004-05-31 00:35:52 +00008from weakref import proxy
Fred Drake2ec80fa2000-10-23 16:59:35 +00009
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +000010from test import test_support
Georg Brandl442b49e2006-06-08 14:50:53 +000011from test.test_support import TESTFN, findfile, run_unittest
Marc-André Lemburgfa44d792000-08-25 22:37:31 +000012from UserList import UserList
13
Georg Brandl442b49e2006-06-08 14:50:53 +000014class AutoFileTests(unittest.TestCase):
15 # file tests for which a test file is automatically set up
Raymond Hettingercb87bc82004-05-31 00:35:52 +000016
Georg Brandl442b49e2006-06-08 14:50:53 +000017 def setUp(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000018 self.f = open(TESTFN, 'wb')
Tim Peters015dd822003-05-04 04:16:52 +000019
Georg Brandl442b49e2006-06-08 14:50:53 +000020 def tearDown(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000021 if self.f:
22 self.f.close()
23 os.remove(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +000024
25 def testWeakRefs(self):
26 # verify weak references
27 p = proxy(self.f)
28 p.write('teststring')
29 self.assertEquals(self.f.tell(), p.tell())
30 self.f.close()
31 self.f = None
32 self.assertRaises(ReferenceError, getattr, p, 'tell')
33
34 def testAttributes(self):
35 # verify expected attributes exist
36 f = self.f
37 softspace = f.softspace
38 f.name # merely shouldn't blow up
39 f.mode # ditto
40 f.closed # ditto
41
42 # verify softspace is writable
43 f.softspace = softspace # merely shouldn't blow up
44
45 # verify the others aren't
46 for attr in 'name', 'mode', 'closed':
47 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
48
49 def testReadinto(self):
50 # verify readinto
51 self.f.write('12')
52 self.f.close()
53 a = array('c', 'x'*10)
54 self.f = open(TESTFN, 'rb')
55 n = self.f.readinto(a)
56 self.assertEquals('12', a.tostring()[:n])
57
58 def testWritelinesUserList(self):
59 # verify writelines with instance sequence
60 l = UserList(['1', '2'])
61 self.f.writelines(l)
62 self.f.close()
63 self.f = open(TESTFN, 'rb')
64 buf = self.f.read()
65 self.assertEquals(buf, '12')
66
67 def testWritelinesIntegers(self):
68 # verify writelines with integers
69 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
70
71 def testWritelinesIntegersUserList(self):
72 # verify writelines with integers in UserList
73 l = UserList([1,2,3])
74 self.assertRaises(TypeError, self.f.writelines, l)
75
76 def testWritelinesNonString(self):
77 # verify writelines with non-string object
Tim Petersdbb82f62006-06-09 03:51:41 +000078 class NonString:
79 pass
Georg Brandl442b49e2006-06-08 14:50:53 +000080
Tim Petersdbb82f62006-06-09 03:51:41 +000081 self.assertRaises(TypeError, self.f.writelines,
82 [NonString(), NonString()])
Georg Brandl442b49e2006-06-08 14:50:53 +000083
84 def testRepr(self):
85 # verify repr works
86 self.assert_(repr(self.f).startswith("<open file '" + TESTFN))
87
88 def testErrors(self):
89 f = self.f
90 self.assertEquals(f.name, TESTFN)
91 self.assert_(not f.isatty())
92 self.assert_(not f.closed)
Tim Peters520d8dd2006-06-09 02:11:02 +000093
Georg Brandl442b49e2006-06-08 14:50:53 +000094 self.assertRaises(TypeError, f.readinto, "")
95 f.close()
96 self.assert_(f.closed)
97
98 def testMethods(self):
99 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
Tim Petersdbb82f62006-06-09 03:51:41 +0000100 'readline', 'readlines', 'seek', 'tell', 'truncate',
101 'write', 'xreadlines', '__iter__']
Georg Brandl442b49e2006-06-08 14:50:53 +0000102 if sys.platform.startswith('atheos'):
103 methods.remove('truncate')
104
Georg Brandle7ec81f2006-06-09 18:29:52 +0000105 # __exit__ should close the file
106 self.f.__exit__(None, None, None)
107 self.assert_(self.f.closed)
Georg Brandl442b49e2006-06-08 14:50:53 +0000108
109 for methodname in methods:
110 method = getattr(self.f, methodname)
111 # should raise on closed file
112 self.assertRaises(ValueError, method)
113 self.assertRaises(ValueError, self.f.writelines, [])
114
Georg Brandle7ec81f2006-06-09 18:29:52 +0000115 # file is closed, __exit__ shouldn't do anything
116 self.assertEquals(self.f.__exit__(None, None, None), None)
117 # it must also return None if an exception was given
118 try:
119 1/0
120 except:
121 self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
122
Skip Montanarof205c132008-12-23 03:30:15 +0000123 def testReadWhenWriting(self):
124 self.assertRaises(IOError, self.f.read)
Georg Brandl442b49e2006-06-08 14:50:53 +0000125
126class OtherFileTests(unittest.TestCase):
127
128 def testModeStrings(self):
129 # check invalid mode strings
130 for mode in ("", "aU", "wU+"):
131 try:
Tim Petersdbb82f62006-06-09 03:51:41 +0000132 f = open(TESTFN, mode)
Georg Brandl442b49e2006-06-08 14:50:53 +0000133 except ValueError:
134 pass
135 else:
136 f.close()
137 self.fail('%r is an invalid file mode' % mode)
138
Amaury Forgeot d'Arc17617a02008-09-25 20:52:56 +0000139 # Some invalid modes fail on Windows, but pass on Unix
140 # Issue3965: avoid a crash on Windows when filename is unicode
141 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
142 try:
143 f = open(name, "rr")
144 except IOError:
145 pass
146 else:
147 f.close()
148
Georg Brandl442b49e2006-06-08 14:50:53 +0000149 def testStdin(self):
150 # This causes the interpreter to exit on OSF1 v5.1.
151 if sys.platform != 'osf1V5':
152 self.assertRaises(IOError, sys.stdin.seek, -1)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000153 else:
Georg Brandl442b49e2006-06-08 14:50:53 +0000154 print >>sys.__stdout__, (
155 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
156 ' Test manually.')
157 self.assertRaises(IOError, sys.stdin.truncate)
158
159 def testUnicodeOpen(self):
160 # verify repr works for unicode too
161 f = open(unicode(TESTFN), "w")
162 self.assert_(repr(f).startswith("<open file u'" + TESTFN))
Thomas Woutersc45251a2006-02-12 11:53:32 +0000163 f.close()
Tim Peters0556e9b2006-06-09 04:02:06 +0000164 os.unlink(TESTFN)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000165
Georg Brandl442b49e2006-06-08 14:50:53 +0000166 def testBadModeArgument(self):
167 # verify that we get a sensible error message for bad mode argument
168 bad_mode = "qwerty"
Tim Peterscffcfed2006-02-14 17:41:18 +0000169 try:
Georg Brandl442b49e2006-06-08 14:50:53 +0000170 f = open(TESTFN, bad_mode)
171 except ValueError, msg:
172 if msg[0] != 0:
173 s = str(msg)
174 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:
175 self.fail("bad error message for invalid mode: %s" % s)
176 # if msg[0] == 0, we're probably on Windows where there may be
177 # no obvious way to discover why open() failed.
178 else:
179 f.close()
180 self.fail("no error for invalid mode: %s" % bad_mode)
181
182 def testSetBufferSize(self):
183 # make sure that explicitly setting the buffer size doesn't cause
184 # misbehaviour especially with repeated close() calls
185 for s in (-1, 0, 1, 512):
186 try:
187 f = open(TESTFN, 'w', s)
188 f.write(str(s))
189 f.close()
190 f.close()
191 f = open(TESTFN, 'r', s)
192 d = int(f.read())
193 f.close()
194 f.close()
195 except IOError, msg:
196 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
197 self.assertEquals(d, s)
198
199 def testTruncateOnWindows(self):
200 os.unlink(TESTFN)
201
202 def bug801631():
203 # SF bug <http://www.python.org/sf/801631>
204 # "file.truncate fault on windows"
Tim Petersdbb82f62006-06-09 03:51:41 +0000205 f = open(TESTFN, 'wb')
Georg Brandl442b49e2006-06-08 14:50:53 +0000206 f.write('12345678901') # 11 bytes
207 f.close()
208
Tim Petersdbb82f62006-06-09 03:51:41 +0000209 f = open(TESTFN,'rb+')
Georg Brandl442b49e2006-06-08 14:50:53 +0000210 data = f.read(5)
211 if data != '12345':
212 self.fail("Read on file opened for update failed %r" % data)
213 if f.tell() != 5:
214 self.fail("File pos after read wrong %d" % f.tell())
215
216 f.truncate()
217 if f.tell() != 5:
218 self.fail("File pos after ftruncate wrong %d" % f.tell())
219
220 f.close()
221 size = os.path.getsize(TESTFN)
222 if size != 5:
223 self.fail("File size after ftruncate wrong %d" % size)
224
225 try:
226 bug801631()
227 finally:
228 os.unlink(TESTFN)
229
230 def testIteration(self):
Tim Petersdbb82f62006-06-09 03:51:41 +0000231 # Test the complex interaction when mixing file-iteration and the
232 # various read* methods. Ostensibly, the mixture could just be tested
233 # to work when it should work according to the Python language,
234 # instead of fail when it should fail according to the current CPython
235 # implementation. People don't always program Python the way they
236 # should, though, and the implemenation might change in subtle ways,
237 # so we explicitly test for errors, too; the test will just have to
238 # be updated when the implementation changes.
Georg Brandl442b49e2006-06-08 14:50:53 +0000239 dataoffset = 16384
240 filler = "ham\n"
241 assert not dataoffset % len(filler), \
242 "dataoffset must be multiple of len(filler)"
243 nchunks = dataoffset // len(filler)
244 testlines = [
245 "spam, spam and eggs\n",
246 "eggs, spam, ham and spam\n",
247 "saussages, spam, spam and eggs\n",
248 "spam, ham, spam and eggs\n",
249 "spam, spam, spam, spam, spam, ham, spam\n",
250 "wonderful spaaaaaam.\n"
251 ]
252 methods = [("readline", ()), ("read", ()), ("readlines", ()),
253 ("readinto", (array("c", " "*100),))]
254
255 try:
256 # Prepare the testfile
257 bag = open(TESTFN, "w")
258 bag.write(filler * nchunks)
259 bag.writelines(testlines)
260 bag.close()
261 # Test for appropriate errors mixing read* and iteration
262 for methodname, args in methods:
263 f = open(TESTFN)
264 if f.next() != filler:
265 self.fail, "Broken testfile"
266 meth = getattr(f, methodname)
267 try:
268 meth(*args)
269 except ValueError:
270 pass
271 else:
272 self.fail("%s%r after next() didn't raise ValueError" %
273 (methodname, args))
274 f.close()
275
Tim Petersdbb82f62006-06-09 03:51:41 +0000276 # Test to see if harmless (by accident) mixing of read* and
277 # iteration still works. This depends on the size of the internal
278 # iteration buffer (currently 8192,) but we can test it in a
279 # flexible manner. Each line in the bag o' ham is 4 bytes
280 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
281 # exactly on the buffer boundary for any power-of-2 buffersize
282 # between 4 and 16384 (inclusive).
Georg Brandl442b49e2006-06-08 14:50:53 +0000283 f = open(TESTFN)
284 for i in range(nchunks):
285 f.next()
286 testline = testlines.pop(0)
287 try:
288 line = f.readline()
289 except ValueError:
290 self.fail("readline() after next() with supposedly empty "
291 "iteration-buffer failed anyway")
292 if line != testline:
293 self.fail("readline() after next() with empty buffer "
294 "failed. Got %r, expected %r" % (line, testline))
295 testline = testlines.pop(0)
296 buf = array("c", "\x00" * len(testline))
297 try:
298 f.readinto(buf)
299 except ValueError:
300 self.fail("readinto() after next() with supposedly empty "
301 "iteration-buffer failed anyway")
302 line = buf.tostring()
303 if line != testline:
304 self.fail("readinto() after next() with empty buffer "
305 "failed. Got %r, expected %r" % (line, testline))
306
307 testline = testlines.pop(0)
308 try:
309 line = f.read(len(testline))
310 except ValueError:
311 self.fail("read() after next() with supposedly empty "
312 "iteration-buffer failed anyway")
313 if line != testline:
314 self.fail("read() after next() with empty buffer "
315 "failed. Got %r, expected %r" % (line, testline))
316 try:
317 lines = f.readlines()
318 except ValueError:
319 self.fail("readlines() after next() with supposedly empty "
320 "iteration-buffer failed anyway")
321 if lines != testlines:
322 self.fail("readlines() after next() with empty buffer "
323 "failed. Got %r, expected %r" % (line, testline))
324 # Reading after iteration hit EOF shouldn't hurt either
325 f = open(TESTFN)
326 try:
327 for line in f:
328 pass
329 try:
330 f.readline()
331 f.readinto(buf)
332 f.read()
333 f.readlines()
334 except ValueError:
335 self.fail("read* failed after next() consumed file")
336 finally:
337 f.close()
338 finally:
339 os.unlink(TESTFN)
340
Georg Brandlad61bc82008-02-23 15:11:18 +0000341class FileSubclassTests(unittest.TestCase):
342
343 def testExit(self):
344 # test that exiting with context calls subclass' close
345 class C(file):
346 def __init__(self, *args):
347 self.subclass_closed = False
348 file.__init__(self, *args)
349 def close(self):
350 self.subclass_closed = True
351 file.close(self)
352
353 with C(TESTFN, 'w') as f:
354 pass
355 self.failUnless(f.subclass_closed)
356
Georg Brandl442b49e2006-06-08 14:50:53 +0000357
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000358class FileThreadingTests(unittest.TestCase):
359 # These tests check the ability to call various methods of file objects
360 # (including close()) concurrently without crashing the Python interpreter.
361 # See #815646, #595601
362
363 def setUp(self):
364 self.f = None
365 self.filename = TESTFN
366 with open(self.filename, "w") as f:
367 f.write("\n".join("0123456789"))
368 self._count_lock = threading.Lock()
369 self.close_count = 0
370 self.close_success_count = 0
371
372 def tearDown(self):
373 if self.f:
374 try:
375 self.f.close()
376 except (EnvironmentError, ValueError):
377 pass
378 try:
379 os.remove(self.filename)
380 except EnvironmentError:
381 pass
382
383 def _create_file(self):
384 self.f = open(self.filename, "w+")
385
386 def _close_file(self):
387 with self._count_lock:
388 self.close_count += 1
389 self.f.close()
390 with self._count_lock:
391 self.close_success_count += 1
392
393 def _close_and_reopen_file(self):
394 self._close_file()
395 # if close raises an exception thats fine, self.f remains valid so
396 # we don't need to reopen.
397 self._create_file()
398
399 def _run_workers(self, func, nb_workers, duration=0.2):
400 with self._count_lock:
401 self.close_count = 0
402 self.close_success_count = 0
403 self.do_continue = True
404 threads = []
405 try:
406 for i in range(nb_workers):
407 t = threading.Thread(target=func)
408 t.start()
409 threads.append(t)
410 for _ in xrange(100):
411 time.sleep(duration/100)
412 with self._count_lock:
413 if self.close_count-self.close_success_count > nb_workers+1:
414 if test_support.verbose:
415 print 'Q',
416 break
417 time.sleep(duration)
418 finally:
419 self.do_continue = False
420 for t in threads:
421 t.join()
422
423 def _test_close_open_io(self, io_func, nb_workers=5):
424 def worker():
425 self._create_file()
426 funcs = itertools.cycle((
427 lambda: io_func(),
428 lambda: self._close_and_reopen_file(),
429 ))
430 for f in funcs:
431 if not self.do_continue:
432 break
433 try:
434 f()
435 except (IOError, ValueError):
436 pass
437 self._run_workers(worker, nb_workers)
438 if test_support.verbose:
439 # Useful verbose statistics when tuning this test to take
440 # less time to run but still ensuring that its still useful.
441 #
442 # the percent of close calls that raised an error
443 percent = 100. - 100.*self.close_success_count/self.close_count
444 print self.close_count, ('%.4f ' % percent),
445
446 def test_close_open(self):
447 def io_func():
448 pass
449 self._test_close_open_io(io_func)
450
451 def test_close_open_flush(self):
452 def io_func():
453 self.f.flush()
454 self._test_close_open_io(io_func)
455
456 def test_close_open_iter(self):
457 def io_func():
458 list(iter(self.f))
459 self._test_close_open_io(io_func)
460
461 def test_close_open_isatty(self):
462 def io_func():
463 self.f.isatty()
464 self._test_close_open_io(io_func)
465
466 def test_close_open_print(self):
467 def io_func():
468 print >> self.f, ''
469 self._test_close_open_io(io_func)
470
471 def test_close_open_read(self):
472 def io_func():
473 self.f.read(0)
474 self._test_close_open_io(io_func)
475
476 def test_close_open_readinto(self):
477 def io_func():
478 a = array('c', 'xxxxx')
479 self.f.readinto(a)
480 self._test_close_open_io(io_func)
481
482 def test_close_open_readline(self):
483 def io_func():
484 self.f.readline()
485 self._test_close_open_io(io_func)
486
487 def test_close_open_readlines(self):
488 def io_func():
489 self.f.readlines()
490 self._test_close_open_io(io_func)
491
492 def test_close_open_seek(self):
493 def io_func():
494 self.f.seek(0, 0)
495 self._test_close_open_io(io_func)
496
497 def test_close_open_tell(self):
498 def io_func():
499 self.f.tell()
500 self._test_close_open_io(io_func)
501
502 def test_close_open_truncate(self):
503 def io_func():
504 self.f.truncate()
505 self._test_close_open_io(io_func)
506
507 def test_close_open_write(self):
508 def io_func():
509 self.f.write('')
510 self._test_close_open_io(io_func)
511
512 def test_close_open_writelines(self):
513 def io_func():
514 self.f.writelines('')
515 self._test_close_open_io(io_func)
516
517
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000518class StdoutTests(unittest.TestCase):
519
520 def test_move_stdout_on_write(self):
521 # Issue 3242: sys.stdout can be replaced (and freed) during a
522 # print statement; prevent a segfault in this case
523 save_stdout = sys.stdout
524
525 class File:
526 def write(self, data):
527 if '\n' in data:
528 sys.stdout = save_stdout
529
530 try:
531 sys.stdout = File()
532 print "some text"
533 finally:
534 sys.stdout = save_stdout
535
Jeffrey Yasskin2d873bd2008-12-08 18:55:24 +0000536 def test_del_stdout_before_print(self):
537 # Issue 4597: 'print' with no argument wasn't reporting when
538 # sys.stdout was deleted.
539 save_stdout = sys.stdout
540 del sys.stdout
541 try:
542 print
543 except RuntimeError as e:
544 self.assertEquals(str(e), "lost sys.stdout")
545 else:
546 self.fail("Expected RuntimeError")
547 finally:
548 sys.stdout = save_stdout
549
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000550
Georg Brandl442b49e2006-06-08 14:50:53 +0000551def test_main():
Neal Norwitzc9778a82006-06-09 05:54:18 +0000552 # Historically, these tests have been sloppy about removing TESTFN.
553 # So get rid of it no matter what.
Tim Peters0556e9b2006-06-09 04:02:06 +0000554 try:
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000555 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000556 FileThreadingTests, StdoutTests)
Tim Peters0556e9b2006-06-09 04:02:06 +0000557 finally:
558 if os.path.exists(TESTFN):
559 os.unlink(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +0000560
561if __name__ == '__main__':
562 test_main()