blob: 5ee037dce30d69ef69a52864af000868d64434ff [file] [log] [blame]
Martin v. Löwisf90ae202002-06-11 06:22:31 +00001import sys
Fred Drake2ec80fa2000-10-23 16:59:35 +00002import os
Georg Brandl442b49e2006-06-08 14:50:53 +00003import unittest
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +00004import itertools
5import time
6import threading
Neal Norwitz62f5a9d2002-04-01 00:09:00 +00007from array import array
Raymond Hettingercb87bc82004-05-31 00:35:52 +00008from weakref import proxy
Fred Drake2ec80fa2000-10-23 16:59:35 +00009
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +000010from test import test_support
Georg Brandl442b49e2006-06-08 14:50:53 +000011from test.test_support import TESTFN, findfile, run_unittest
Marc-André Lemburgfa44d792000-08-25 22:37:31 +000012from UserList import UserList
13
Georg Brandl442b49e2006-06-08 14:50:53 +000014class AutoFileTests(unittest.TestCase):
15 # file tests for which a test file is automatically set up
Raymond Hettingercb87bc82004-05-31 00:35:52 +000016
Georg Brandl442b49e2006-06-08 14:50:53 +000017 def setUp(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000018 self.f = open(TESTFN, 'wb')
Tim Peters015dd822003-05-04 04:16:52 +000019
Georg Brandl442b49e2006-06-08 14:50:53 +000020 def tearDown(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000021 if self.f:
22 self.f.close()
23 os.remove(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +000024
25 def testWeakRefs(self):
26 # verify weak references
27 p = proxy(self.f)
28 p.write('teststring')
29 self.assertEquals(self.f.tell(), p.tell())
30 self.f.close()
31 self.f = None
32 self.assertRaises(ReferenceError, getattr, p, 'tell')
33
34 def testAttributes(self):
35 # verify expected attributes exist
36 f = self.f
37 softspace = f.softspace
38 f.name # merely shouldn't blow up
39 f.mode # ditto
40 f.closed # ditto
41
42 # verify softspace is writable
43 f.softspace = softspace # merely shouldn't blow up
44
45 # verify the others aren't
46 for attr in 'name', 'mode', 'closed':
47 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
48
49 def testReadinto(self):
50 # verify readinto
51 self.f.write('12')
52 self.f.close()
53 a = array('c', 'x'*10)
54 self.f = open(TESTFN, 'rb')
55 n = self.f.readinto(a)
56 self.assertEquals('12', a.tostring()[:n])
57
58 def testWritelinesUserList(self):
59 # verify writelines with instance sequence
60 l = UserList(['1', '2'])
61 self.f.writelines(l)
62 self.f.close()
63 self.f = open(TESTFN, 'rb')
64 buf = self.f.read()
65 self.assertEquals(buf, '12')
66
67 def testWritelinesIntegers(self):
68 # verify writelines with integers
69 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
70
71 def testWritelinesIntegersUserList(self):
72 # verify writelines with integers in UserList
73 l = UserList([1,2,3])
74 self.assertRaises(TypeError, self.f.writelines, l)
75
76 def testWritelinesNonString(self):
77 # verify writelines with non-string object
Tim Petersdbb82f62006-06-09 03:51:41 +000078 class NonString:
79 pass
Georg Brandl442b49e2006-06-08 14:50:53 +000080
Tim Petersdbb82f62006-06-09 03:51:41 +000081 self.assertRaises(TypeError, self.f.writelines,
82 [NonString(), NonString()])
Georg Brandl442b49e2006-06-08 14:50:53 +000083
84 def testRepr(self):
85 # verify repr works
86 self.assert_(repr(self.f).startswith("<open file '" + TESTFN))
87
88 def testErrors(self):
Antoine Pitrou24837282010-02-05 17:11:32 +000089 self.f.close()
90 self.f = open(TESTFN, 'rb')
Georg Brandl442b49e2006-06-08 14:50:53 +000091 f = self.f
92 self.assertEquals(f.name, TESTFN)
93 self.assert_(not f.isatty())
94 self.assert_(not f.closed)
Tim Peters520d8dd2006-06-09 02:11:02 +000095
Georg Brandl442b49e2006-06-08 14:50:53 +000096 self.assertRaises(TypeError, f.readinto, "")
97 f.close()
98 self.assert_(f.closed)
99
100 def testMethods(self):
101 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
Tim Petersdbb82f62006-06-09 03:51:41 +0000102 'readline', 'readlines', 'seek', 'tell', 'truncate',
103 'write', 'xreadlines', '__iter__']
Georg Brandl442b49e2006-06-08 14:50:53 +0000104 if sys.platform.startswith('atheos'):
105 methods.remove('truncate')
106
Georg Brandle7ec81f2006-06-09 18:29:52 +0000107 # __exit__ should close the file
108 self.f.__exit__(None, None, None)
109 self.assert_(self.f.closed)
Georg Brandl442b49e2006-06-08 14:50:53 +0000110
111 for methodname in methods:
112 method = getattr(self.f, methodname)
113 # should raise on closed file
114 self.assertRaises(ValueError, method)
115 self.assertRaises(ValueError, self.f.writelines, [])
116
Georg Brandle7ec81f2006-06-09 18:29:52 +0000117 # file is closed, __exit__ shouldn't do anything
118 self.assertEquals(self.f.__exit__(None, None, None), None)
119 # it must also return None if an exception was given
120 try:
Ezio Melotti3efafd72010-08-02 18:40:55 +0000121 1 // 0
Georg Brandle7ec81f2006-06-09 18:29:52 +0000122 except:
123 self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
124
Skip Montanaro6a00c742008-12-23 03:35:04 +0000125 def testReadWhenWriting(self):
126 self.assertRaises(IOError, self.f.read)
Georg Brandl442b49e2006-06-08 14:50:53 +0000127
Antoine Pitrou24837282010-02-05 17:11:32 +0000128 def testIssue5677(self):
129 # Remark: Do not perform more than one test per open file,
130 # since that does NOT catch the readline error on Windows.
131 data = 'xxx'
132 for mode in ['w', 'wb', 'a', 'ab']:
133 for attr in ['read', 'readline', 'readlines']:
134 self.f = open(TESTFN, mode)
135 self.f.write(data)
136 self.assertRaises(IOError, getattr(self.f, attr))
137 self.f.close()
138
139 self.f = open(TESTFN, mode)
140 self.f.write(data)
141 self.assertRaises(IOError, lambda: [line for line in self.f])
142 self.f.close()
143
144 self.f = open(TESTFN, mode)
145 self.f.write(data)
146 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
147 self.f.close()
148
149 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
150 self.f = open(TESTFN, mode)
151 self.assertRaises(IOError, self.f.write, data)
152 self.f.close()
153
154 self.f = open(TESTFN, mode)
155 self.assertRaises(IOError, self.f.writelines, [data, data])
156 self.f.close()
157
158 self.f = open(TESTFN, mode)
159 self.assertRaises(IOError, self.f.truncate)
160 self.f.close()
161
Georg Brandl442b49e2006-06-08 14:50:53 +0000162class OtherFileTests(unittest.TestCase):
163
Georg Brandl47fe9812009-01-01 15:46:10 +0000164 def testOpenDir(self):
165 this_dir = os.path.dirname(__file__)
166 for mode in (None, "w"):
167 try:
168 if mode:
169 f = open(this_dir, mode)
170 else:
171 f = open(this_dir)
172 except IOError as e:
173 self.assertEqual(e.filename, this_dir)
174 else:
175 self.fail("opening a directory didn't raise an IOError")
176
Georg Brandl442b49e2006-06-08 14:50:53 +0000177 def testModeStrings(self):
178 # check invalid mode strings
179 for mode in ("", "aU", "wU+"):
180 try:
Tim Petersdbb82f62006-06-09 03:51:41 +0000181 f = open(TESTFN, mode)
Georg Brandl442b49e2006-06-08 14:50:53 +0000182 except ValueError:
183 pass
184 else:
185 f.close()
186 self.fail('%r is an invalid file mode' % mode)
187
Amaury Forgeot d'Arc17617a02008-09-25 20:52:56 +0000188 # Some invalid modes fail on Windows, but pass on Unix
189 # Issue3965: avoid a crash on Windows when filename is unicode
190 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
191 try:
192 f = open(name, "rr")
193 except IOError:
194 pass
195 else:
196 f.close()
197
Georg Brandl442b49e2006-06-08 14:50:53 +0000198 def testStdin(self):
199 # This causes the interpreter to exit on OSF1 v5.1.
200 if sys.platform != 'osf1V5':
201 self.assertRaises(IOError, sys.stdin.seek, -1)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000202 else:
Georg Brandl442b49e2006-06-08 14:50:53 +0000203 print >>sys.__stdout__, (
204 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
205 ' Test manually.')
206 self.assertRaises(IOError, sys.stdin.truncate)
207
208 def testUnicodeOpen(self):
209 # verify repr works for unicode too
210 f = open(unicode(TESTFN), "w")
211 self.assert_(repr(f).startswith("<open file u'" + TESTFN))
Thomas Woutersc45251a2006-02-12 11:53:32 +0000212 f.close()
Tim Peters0556e9b2006-06-09 04:02:06 +0000213 os.unlink(TESTFN)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000214
Georg Brandl442b49e2006-06-08 14:50:53 +0000215 def testBadModeArgument(self):
216 # verify that we get a sensible error message for bad mode argument
217 bad_mode = "qwerty"
Tim Peterscffcfed2006-02-14 17:41:18 +0000218 try:
Georg Brandl442b49e2006-06-08 14:50:53 +0000219 f = open(TESTFN, bad_mode)
220 except ValueError, msg:
221 if msg[0] != 0:
222 s = str(msg)
Ezio Melottic0fd6ff2010-03-23 13:20:39 +0000223 if TESTFN in s or bad_mode not in s:
Georg Brandl442b49e2006-06-08 14:50:53 +0000224 self.fail("bad error message for invalid mode: %s" % s)
225 # if msg[0] == 0, we're probably on Windows where there may be
226 # no obvious way to discover why open() failed.
227 else:
228 f.close()
229 self.fail("no error for invalid mode: %s" % bad_mode)
230
231 def testSetBufferSize(self):
232 # make sure that explicitly setting the buffer size doesn't cause
233 # misbehaviour especially with repeated close() calls
234 for s in (-1, 0, 1, 512):
235 try:
236 f = open(TESTFN, 'w', s)
237 f.write(str(s))
238 f.close()
239 f.close()
240 f = open(TESTFN, 'r', s)
241 d = int(f.read())
242 f.close()
243 f.close()
244 except IOError, msg:
245 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
246 self.assertEquals(d, s)
247
248 def testTruncateOnWindows(self):
249 os.unlink(TESTFN)
250
251 def bug801631():
252 # SF bug <http://www.python.org/sf/801631>
253 # "file.truncate fault on windows"
Tim Petersdbb82f62006-06-09 03:51:41 +0000254 f = open(TESTFN, 'wb')
Georg Brandl442b49e2006-06-08 14:50:53 +0000255 f.write('12345678901') # 11 bytes
256 f.close()
257
Tim Petersdbb82f62006-06-09 03:51:41 +0000258 f = open(TESTFN,'rb+')
Georg Brandl442b49e2006-06-08 14:50:53 +0000259 data = f.read(5)
260 if data != '12345':
261 self.fail("Read on file opened for update failed %r" % data)
262 if f.tell() != 5:
263 self.fail("File pos after read wrong %d" % f.tell())
264
265 f.truncate()
266 if f.tell() != 5:
267 self.fail("File pos after ftruncate wrong %d" % f.tell())
268
269 f.close()
270 size = os.path.getsize(TESTFN)
271 if size != 5:
272 self.fail("File size after ftruncate wrong %d" % size)
273
274 try:
275 bug801631()
276 finally:
277 os.unlink(TESTFN)
278
279 def testIteration(self):
Tim Petersdbb82f62006-06-09 03:51:41 +0000280 # Test the complex interaction when mixing file-iteration and the
281 # various read* methods. Ostensibly, the mixture could just be tested
282 # to work when it should work according to the Python language,
283 # instead of fail when it should fail according to the current CPython
284 # implementation. People don't always program Python the way they
285 # should, though, and the implemenation might change in subtle ways,
286 # so we explicitly test for errors, too; the test will just have to
287 # be updated when the implementation changes.
Georg Brandl442b49e2006-06-08 14:50:53 +0000288 dataoffset = 16384
289 filler = "ham\n"
290 assert not dataoffset % len(filler), \
291 "dataoffset must be multiple of len(filler)"
292 nchunks = dataoffset // len(filler)
293 testlines = [
294 "spam, spam and eggs\n",
295 "eggs, spam, ham and spam\n",
296 "saussages, spam, spam and eggs\n",
297 "spam, ham, spam and eggs\n",
298 "spam, spam, spam, spam, spam, ham, spam\n",
299 "wonderful spaaaaaam.\n"
300 ]
301 methods = [("readline", ()), ("read", ()), ("readlines", ()),
302 ("readinto", (array("c", " "*100),))]
303
304 try:
305 # Prepare the testfile
306 bag = open(TESTFN, "w")
307 bag.write(filler * nchunks)
308 bag.writelines(testlines)
309 bag.close()
310 # Test for appropriate errors mixing read* and iteration
311 for methodname, args in methods:
312 f = open(TESTFN)
313 if f.next() != filler:
314 self.fail, "Broken testfile"
315 meth = getattr(f, methodname)
316 try:
317 meth(*args)
318 except ValueError:
319 pass
320 else:
321 self.fail("%s%r after next() didn't raise ValueError" %
322 (methodname, args))
323 f.close()
324
Tim Petersdbb82f62006-06-09 03:51:41 +0000325 # Test to see if harmless (by accident) mixing of read* and
326 # iteration still works. This depends on the size of the internal
327 # iteration buffer (currently 8192,) but we can test it in a
328 # flexible manner. Each line in the bag o' ham is 4 bytes
329 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
330 # exactly on the buffer boundary for any power-of-2 buffersize
331 # between 4 and 16384 (inclusive).
Georg Brandl442b49e2006-06-08 14:50:53 +0000332 f = open(TESTFN)
333 for i in range(nchunks):
334 f.next()
335 testline = testlines.pop(0)
336 try:
337 line = f.readline()
338 except ValueError:
339 self.fail("readline() after next() with supposedly empty "
340 "iteration-buffer failed anyway")
341 if line != testline:
342 self.fail("readline() after next() with empty buffer "
343 "failed. Got %r, expected %r" % (line, testline))
344 testline = testlines.pop(0)
345 buf = array("c", "\x00" * len(testline))
346 try:
347 f.readinto(buf)
348 except ValueError:
349 self.fail("readinto() after next() with supposedly empty "
350 "iteration-buffer failed anyway")
351 line = buf.tostring()
352 if line != testline:
353 self.fail("readinto() after next() with empty buffer "
354 "failed. Got %r, expected %r" % (line, testline))
355
356 testline = testlines.pop(0)
357 try:
358 line = f.read(len(testline))
359 except ValueError:
360 self.fail("read() after next() with supposedly empty "
361 "iteration-buffer failed anyway")
362 if line != testline:
363 self.fail("read() after next() with empty buffer "
364 "failed. Got %r, expected %r" % (line, testline))
365 try:
366 lines = f.readlines()
367 except ValueError:
368 self.fail("readlines() after next() with supposedly empty "
369 "iteration-buffer failed anyway")
370 if lines != testlines:
371 self.fail("readlines() after next() with empty buffer "
372 "failed. Got %r, expected %r" % (line, testline))
373 # Reading after iteration hit EOF shouldn't hurt either
374 f = open(TESTFN)
375 try:
376 for line in f:
377 pass
378 try:
379 f.readline()
380 f.readinto(buf)
381 f.read()
382 f.readlines()
383 except ValueError:
384 self.fail("read* failed after next() consumed file")
385 finally:
386 f.close()
387 finally:
388 os.unlink(TESTFN)
389
Georg Brandlad61bc82008-02-23 15:11:18 +0000390class FileSubclassTests(unittest.TestCase):
391
392 def testExit(self):
393 # test that exiting with context calls subclass' close
394 class C(file):
395 def __init__(self, *args):
396 self.subclass_closed = False
397 file.__init__(self, *args)
398 def close(self):
399 self.subclass_closed = True
400 file.close(self)
401
402 with C(TESTFN, 'w') as f:
403 pass
404 self.failUnless(f.subclass_closed)
405
Georg Brandl442b49e2006-06-08 14:50:53 +0000406
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000407class FileThreadingTests(unittest.TestCase):
408 # These tests check the ability to call various methods of file objects
409 # (including close()) concurrently without crashing the Python interpreter.
410 # See #815646, #595601
411
412 def setUp(self):
413 self.f = None
414 self.filename = TESTFN
415 with open(self.filename, "w") as f:
416 f.write("\n".join("0123456789"))
417 self._count_lock = threading.Lock()
418 self.close_count = 0
419 self.close_success_count = 0
Antoine Pitrou5de15942010-05-17 20:00:52 +0000420 self.use_buffering = False
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000421
422 def tearDown(self):
423 if self.f:
424 try:
425 self.f.close()
426 except (EnvironmentError, ValueError):
427 pass
428 try:
429 os.remove(self.filename)
430 except EnvironmentError:
431 pass
432
433 def _create_file(self):
Antoine Pitrou5de15942010-05-17 20:00:52 +0000434 if self.use_buffering:
435 self.f = open(self.filename, "w+", buffering=1024*16)
436 else:
437 self.f = open(self.filename, "w+")
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000438
439 def _close_file(self):
440 with self._count_lock:
441 self.close_count += 1
442 self.f.close()
443 with self._count_lock:
444 self.close_success_count += 1
445
446 def _close_and_reopen_file(self):
447 self._close_file()
448 # if close raises an exception thats fine, self.f remains valid so
449 # we don't need to reopen.
450 self._create_file()
451
452 def _run_workers(self, func, nb_workers, duration=0.2):
453 with self._count_lock:
454 self.close_count = 0
455 self.close_success_count = 0
456 self.do_continue = True
457 threads = []
458 try:
459 for i in range(nb_workers):
460 t = threading.Thread(target=func)
461 t.start()
462 threads.append(t)
463 for _ in xrange(100):
464 time.sleep(duration/100)
465 with self._count_lock:
466 if self.close_count-self.close_success_count > nb_workers+1:
467 if test_support.verbose:
468 print 'Q',
469 break
470 time.sleep(duration)
471 finally:
472 self.do_continue = False
473 for t in threads:
474 t.join()
475
476 def _test_close_open_io(self, io_func, nb_workers=5):
477 def worker():
478 self._create_file()
479 funcs = itertools.cycle((
480 lambda: io_func(),
481 lambda: self._close_and_reopen_file(),
482 ))
483 for f in funcs:
484 if not self.do_continue:
485 break
486 try:
487 f()
488 except (IOError, ValueError):
489 pass
490 self._run_workers(worker, nb_workers)
491 if test_support.verbose:
492 # Useful verbose statistics when tuning this test to take
493 # less time to run but still ensuring that its still useful.
494 #
495 # the percent of close calls that raised an error
496 percent = 100. - 100.*self.close_success_count/self.close_count
497 print self.close_count, ('%.4f ' % percent),
498
499 def test_close_open(self):
500 def io_func():
501 pass
502 self._test_close_open_io(io_func)
503
504 def test_close_open_flush(self):
505 def io_func():
506 self.f.flush()
507 self._test_close_open_io(io_func)
508
509 def test_close_open_iter(self):
510 def io_func():
511 list(iter(self.f))
512 self._test_close_open_io(io_func)
513
514 def test_close_open_isatty(self):
515 def io_func():
516 self.f.isatty()
517 self._test_close_open_io(io_func)
518
519 def test_close_open_print(self):
520 def io_func():
521 print >> self.f, ''
522 self._test_close_open_io(io_func)
523
Antoine Pitrou5de15942010-05-17 20:00:52 +0000524 def test_close_open_print_buffered(self):
525 self.use_buffering = True
526 def io_func():
527 print >> self.f, ''
528 self._test_close_open_io(io_func)
529
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000530 def test_close_open_read(self):
531 def io_func():
532 self.f.read(0)
533 self._test_close_open_io(io_func)
534
535 def test_close_open_readinto(self):
536 def io_func():
537 a = array('c', 'xxxxx')
538 self.f.readinto(a)
539 self._test_close_open_io(io_func)
540
541 def test_close_open_readline(self):
542 def io_func():
543 self.f.readline()
544 self._test_close_open_io(io_func)
545
546 def test_close_open_readlines(self):
547 def io_func():
548 self.f.readlines()
549 self._test_close_open_io(io_func)
550
551 def test_close_open_seek(self):
552 def io_func():
553 self.f.seek(0, 0)
554 self._test_close_open_io(io_func)
555
556 def test_close_open_tell(self):
557 def io_func():
558 self.f.tell()
559 self._test_close_open_io(io_func)
560
561 def test_close_open_truncate(self):
562 def io_func():
563 self.f.truncate()
564 self._test_close_open_io(io_func)
565
566 def test_close_open_write(self):
567 def io_func():
568 self.f.write('')
569 self._test_close_open_io(io_func)
570
571 def test_close_open_writelines(self):
572 def io_func():
573 self.f.writelines('')
574 self._test_close_open_io(io_func)
575
576
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000577class StdoutTests(unittest.TestCase):
578
579 def test_move_stdout_on_write(self):
580 # Issue 3242: sys.stdout can be replaced (and freed) during a
581 # print statement; prevent a segfault in this case
582 save_stdout = sys.stdout
583
584 class File:
585 def write(self, data):
586 if '\n' in data:
587 sys.stdout = save_stdout
588
589 try:
590 sys.stdout = File()
591 print "some text"
592 finally:
593 sys.stdout = save_stdout
594
Jeffrey Yasskin69614982008-12-11 05:21:18 +0000595 def test_del_stdout_before_print(self):
596 # Issue 4597: 'print' with no argument wasn't reporting when
597 # sys.stdout was deleted.
598 save_stdout = sys.stdout
599 del sys.stdout
600 try:
601 print
602 except RuntimeError as e:
603 self.assertEquals(str(e), "lost sys.stdout")
604 else:
605 self.fail("Expected RuntimeError")
606 finally:
607 sys.stdout = save_stdout
608
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000609
Georg Brandl442b49e2006-06-08 14:50:53 +0000610def test_main():
Neal Norwitzc9778a82006-06-09 05:54:18 +0000611 # Historically, these tests have been sloppy about removing TESTFN.
612 # So get rid of it no matter what.
Tim Peters0556e9b2006-06-09 04:02:06 +0000613 try:
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000614 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000615 FileThreadingTests, StdoutTests)
Tim Peters0556e9b2006-06-09 04:02:06 +0000616 finally:
617 if os.path.exists(TESTFN):
618 os.unlink(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +0000619
620if __name__ == '__main__':
621 test_main()