blob: c5e5ea16df7d2aa4db8bf9e47bd94e8f971afa1f [file] [log] [blame]
Martin v. Löwisf90ae202002-06-11 06:22:31 +00001import sys
Fred Drake2ec80fa2000-10-23 16:59:35 +00002import os
Georg Brandl442b49e2006-06-08 14:50:53 +00003import unittest
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +00004import itertools
5import time
6import threading
Neal Norwitz62f5a9d2002-04-01 00:09:00 +00007from array import array
Raymond Hettingercb87bc82004-05-31 00:35:52 +00008from weakref import proxy
Fred Drake2ec80fa2000-10-23 16:59:35 +00009
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +000010from test import test_support
Georg Brandl442b49e2006-06-08 14:50:53 +000011from test.test_support import TESTFN, findfile, run_unittest
Marc-André Lemburgfa44d792000-08-25 22:37:31 +000012from UserList import UserList
13
Georg Brandl442b49e2006-06-08 14:50:53 +000014class AutoFileTests(unittest.TestCase):
15 # file tests for which a test file is automatically set up
Raymond Hettingercb87bc82004-05-31 00:35:52 +000016
Georg Brandl442b49e2006-06-08 14:50:53 +000017 def setUp(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000018 self.f = open(TESTFN, 'wb')
Tim Peters015dd822003-05-04 04:16:52 +000019
Georg Brandl442b49e2006-06-08 14:50:53 +000020 def tearDown(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000021 if self.f:
22 self.f.close()
23 os.remove(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +000024
25 def testWeakRefs(self):
26 # verify weak references
27 p = proxy(self.f)
28 p.write('teststring')
29 self.assertEquals(self.f.tell(), p.tell())
30 self.f.close()
31 self.f = None
32 self.assertRaises(ReferenceError, getattr, p, 'tell')
33
34 def testAttributes(self):
35 # verify expected attributes exist
36 f = self.f
37 softspace = f.softspace
38 f.name # merely shouldn't blow up
39 f.mode # ditto
40 f.closed # ditto
41
42 # verify softspace is writable
43 f.softspace = softspace # merely shouldn't blow up
44
45 # verify the others aren't
46 for attr in 'name', 'mode', 'closed':
47 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
48
49 def testReadinto(self):
50 # verify readinto
51 self.f.write('12')
52 self.f.close()
53 a = array('c', 'x'*10)
54 self.f = open(TESTFN, 'rb')
55 n = self.f.readinto(a)
56 self.assertEquals('12', a.tostring()[:n])
57
58 def testWritelinesUserList(self):
59 # verify writelines with instance sequence
60 l = UserList(['1', '2'])
61 self.f.writelines(l)
62 self.f.close()
63 self.f = open(TESTFN, 'rb')
64 buf = self.f.read()
65 self.assertEquals(buf, '12')
66
67 def testWritelinesIntegers(self):
68 # verify writelines with integers
69 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
70
71 def testWritelinesIntegersUserList(self):
72 # verify writelines with integers in UserList
73 l = UserList([1,2,3])
74 self.assertRaises(TypeError, self.f.writelines, l)
75
76 def testWritelinesNonString(self):
77 # verify writelines with non-string object
Tim Petersdbb82f62006-06-09 03:51:41 +000078 class NonString:
79 pass
Georg Brandl442b49e2006-06-08 14:50:53 +000080
Tim Petersdbb82f62006-06-09 03:51:41 +000081 self.assertRaises(TypeError, self.f.writelines,
82 [NonString(), NonString()])
Georg Brandl442b49e2006-06-08 14:50:53 +000083
84 def testRepr(self):
85 # verify repr works
86 self.assert_(repr(self.f).startswith("<open file '" + TESTFN))
87
88 def testErrors(self):
Antoine Pitrou24837282010-02-05 17:11:32 +000089 self.f.close()
90 self.f = open(TESTFN, 'rb')
Georg Brandl442b49e2006-06-08 14:50:53 +000091 f = self.f
92 self.assertEquals(f.name, TESTFN)
93 self.assert_(not f.isatty())
94 self.assert_(not f.closed)
Tim Peters520d8dd2006-06-09 02:11:02 +000095
Georg Brandl442b49e2006-06-08 14:50:53 +000096 self.assertRaises(TypeError, f.readinto, "")
97 f.close()
98 self.assert_(f.closed)
99
100 def testMethods(self):
101 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
Tim Petersdbb82f62006-06-09 03:51:41 +0000102 'readline', 'readlines', 'seek', 'tell', 'truncate',
103 'write', 'xreadlines', '__iter__']
Georg Brandl442b49e2006-06-08 14:50:53 +0000104 if sys.platform.startswith('atheos'):
105 methods.remove('truncate')
106
Georg Brandle7ec81f2006-06-09 18:29:52 +0000107 # __exit__ should close the file
108 self.f.__exit__(None, None, None)
109 self.assert_(self.f.closed)
Georg Brandl442b49e2006-06-08 14:50:53 +0000110
111 for methodname in methods:
112 method = getattr(self.f, methodname)
113 # should raise on closed file
114 self.assertRaises(ValueError, method)
115 self.assertRaises(ValueError, self.f.writelines, [])
116
Georg Brandle7ec81f2006-06-09 18:29:52 +0000117 # file is closed, __exit__ shouldn't do anything
118 self.assertEquals(self.f.__exit__(None, None, None), None)
119 # it must also return None if an exception was given
120 try:
121 1/0
122 except:
123 self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
124
Skip Montanaro6a00c742008-12-23 03:35:04 +0000125 def testReadWhenWriting(self):
126 self.assertRaises(IOError, self.f.read)
Georg Brandl442b49e2006-06-08 14:50:53 +0000127
Antoine Pitrou24837282010-02-05 17:11:32 +0000128 def testIssue5677(self):
129 # Remark: Do not perform more than one test per open file,
130 # since that does NOT catch the readline error on Windows.
131 data = 'xxx'
132 for mode in ['w', 'wb', 'a', 'ab']:
133 for attr in ['read', 'readline', 'readlines']:
134 self.f = open(TESTFN, mode)
135 self.f.write(data)
136 self.assertRaises(IOError, getattr(self.f, attr))
137 self.f.close()
138
139 self.f = open(TESTFN, mode)
140 self.f.write(data)
141 self.assertRaises(IOError, lambda: [line for line in self.f])
142 self.f.close()
143
144 self.f = open(TESTFN, mode)
145 self.f.write(data)
146 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
147 self.f.close()
148
149 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
150 self.f = open(TESTFN, mode)
151 self.assertRaises(IOError, self.f.write, data)
152 self.f.close()
153
154 self.f = open(TESTFN, mode)
155 self.assertRaises(IOError, self.f.writelines, [data, data])
156 self.f.close()
157
158 self.f = open(TESTFN, mode)
159 self.assertRaises(IOError, self.f.truncate)
160 self.f.close()
161
Georg Brandl442b49e2006-06-08 14:50:53 +0000162class OtherFileTests(unittest.TestCase):
163
Georg Brandl47fe9812009-01-01 15:46:10 +0000164 def testOpenDir(self):
165 this_dir = os.path.dirname(__file__)
166 for mode in (None, "w"):
167 try:
168 if mode:
169 f = open(this_dir, mode)
170 else:
171 f = open(this_dir)
172 except IOError as e:
173 self.assertEqual(e.filename, this_dir)
174 else:
175 self.fail("opening a directory didn't raise an IOError")
176
Georg Brandl442b49e2006-06-08 14:50:53 +0000177 def testModeStrings(self):
178 # check invalid mode strings
179 for mode in ("", "aU", "wU+"):
180 try:
Tim Petersdbb82f62006-06-09 03:51:41 +0000181 f = open(TESTFN, mode)
Georg Brandl442b49e2006-06-08 14:50:53 +0000182 except ValueError:
183 pass
184 else:
185 f.close()
186 self.fail('%r is an invalid file mode' % mode)
187
Amaury Forgeot d'Arc17617a02008-09-25 20:52:56 +0000188 # Some invalid modes fail on Windows, but pass on Unix
189 # Issue3965: avoid a crash on Windows when filename is unicode
190 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
191 try:
192 f = open(name, "rr")
193 except IOError:
194 pass
195 else:
196 f.close()
197
Georg Brandl442b49e2006-06-08 14:50:53 +0000198 def testStdin(self):
199 # This causes the interpreter to exit on OSF1 v5.1.
200 if sys.platform != 'osf1V5':
201 self.assertRaises(IOError, sys.stdin.seek, -1)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000202 else:
Georg Brandl442b49e2006-06-08 14:50:53 +0000203 print >>sys.__stdout__, (
204 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
205 ' Test manually.')
206 self.assertRaises(IOError, sys.stdin.truncate)
207
208 def testUnicodeOpen(self):
209 # verify repr works for unicode too
210 f = open(unicode(TESTFN), "w")
211 self.assert_(repr(f).startswith("<open file u'" + TESTFN))
Thomas Woutersc45251a2006-02-12 11:53:32 +0000212 f.close()
Tim Peters0556e9b2006-06-09 04:02:06 +0000213 os.unlink(TESTFN)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000214
Georg Brandl442b49e2006-06-08 14:50:53 +0000215 def testBadModeArgument(self):
216 # verify that we get a sensible error message for bad mode argument
217 bad_mode = "qwerty"
Tim Peterscffcfed2006-02-14 17:41:18 +0000218 try:
Georg Brandl442b49e2006-06-08 14:50:53 +0000219 f = open(TESTFN, bad_mode)
220 except ValueError, msg:
221 if msg[0] != 0:
222 s = str(msg)
Ezio Melottic0fd6ff2010-03-23 13:20:39 +0000223 if TESTFN in s or bad_mode not in s:
Georg Brandl442b49e2006-06-08 14:50:53 +0000224 self.fail("bad error message for invalid mode: %s" % s)
225 # if msg[0] == 0, we're probably on Windows where there may be
226 # no obvious way to discover why open() failed.
227 else:
228 f.close()
229 self.fail("no error for invalid mode: %s" % bad_mode)
230
231 def testSetBufferSize(self):
232 # make sure that explicitly setting the buffer size doesn't cause
233 # misbehaviour especially with repeated close() calls
234 for s in (-1, 0, 1, 512):
235 try:
236 f = open(TESTFN, 'w', s)
237 f.write(str(s))
238 f.close()
239 f.close()
240 f = open(TESTFN, 'r', s)
241 d = int(f.read())
242 f.close()
243 f.close()
244 except IOError, msg:
245 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
246 self.assertEquals(d, s)
247
248 def testTruncateOnWindows(self):
249 os.unlink(TESTFN)
250
251 def bug801631():
252 # SF bug <http://www.python.org/sf/801631>
253 # "file.truncate fault on windows"
Tim Petersdbb82f62006-06-09 03:51:41 +0000254 f = open(TESTFN, 'wb')
Georg Brandl442b49e2006-06-08 14:50:53 +0000255 f.write('12345678901') # 11 bytes
256 f.close()
257
Tim Petersdbb82f62006-06-09 03:51:41 +0000258 f = open(TESTFN,'rb+')
Georg Brandl442b49e2006-06-08 14:50:53 +0000259 data = f.read(5)
260 if data != '12345':
261 self.fail("Read on file opened for update failed %r" % data)
262 if f.tell() != 5:
263 self.fail("File pos after read wrong %d" % f.tell())
264
265 f.truncate()
266 if f.tell() != 5:
267 self.fail("File pos after ftruncate wrong %d" % f.tell())
268
269 f.close()
270 size = os.path.getsize(TESTFN)
271 if size != 5:
272 self.fail("File size after ftruncate wrong %d" % size)
273
274 try:
275 bug801631()
276 finally:
277 os.unlink(TESTFN)
278
279 def testIteration(self):
Tim Petersdbb82f62006-06-09 03:51:41 +0000280 # Test the complex interaction when mixing file-iteration and the
281 # various read* methods. Ostensibly, the mixture could just be tested
282 # to work when it should work according to the Python language,
283 # instead of fail when it should fail according to the current CPython
284 # implementation. People don't always program Python the way they
285 # should, though, and the implemenation might change in subtle ways,
286 # so we explicitly test for errors, too; the test will just have to
287 # be updated when the implementation changes.
Georg Brandl442b49e2006-06-08 14:50:53 +0000288 dataoffset = 16384
289 filler = "ham\n"
290 assert not dataoffset % len(filler), \
291 "dataoffset must be multiple of len(filler)"
292 nchunks = dataoffset // len(filler)
293 testlines = [
294 "spam, spam and eggs\n",
295 "eggs, spam, ham and spam\n",
296 "saussages, spam, spam and eggs\n",
297 "spam, ham, spam and eggs\n",
298 "spam, spam, spam, spam, spam, ham, spam\n",
299 "wonderful spaaaaaam.\n"
300 ]
301 methods = [("readline", ()), ("read", ()), ("readlines", ()),
302 ("readinto", (array("c", " "*100),))]
303
304 try:
305 # Prepare the testfile
306 bag = open(TESTFN, "w")
307 bag.write(filler * nchunks)
308 bag.writelines(testlines)
309 bag.close()
310 # Test for appropriate errors mixing read* and iteration
311 for methodname, args in methods:
312 f = open(TESTFN)
313 if f.next() != filler:
314 self.fail, "Broken testfile"
315 meth = getattr(f, methodname)
316 try:
317 meth(*args)
318 except ValueError:
319 pass
320 else:
321 self.fail("%s%r after next() didn't raise ValueError" %
322 (methodname, args))
323 f.close()
324
Tim Petersdbb82f62006-06-09 03:51:41 +0000325 # Test to see if harmless (by accident) mixing of read* and
326 # iteration still works. This depends on the size of the internal
327 # iteration buffer (currently 8192,) but we can test it in a
328 # flexible manner. Each line in the bag o' ham is 4 bytes
329 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
330 # exactly on the buffer boundary for any power-of-2 buffersize
331 # between 4 and 16384 (inclusive).
Georg Brandl442b49e2006-06-08 14:50:53 +0000332 f = open(TESTFN)
333 for i in range(nchunks):
334 f.next()
335 testline = testlines.pop(0)
336 try:
337 line = f.readline()
338 except ValueError:
339 self.fail("readline() after next() with supposedly empty "
340 "iteration-buffer failed anyway")
341 if line != testline:
342 self.fail("readline() after next() with empty buffer "
343 "failed. Got %r, expected %r" % (line, testline))
344 testline = testlines.pop(0)
345 buf = array("c", "\x00" * len(testline))
346 try:
347 f.readinto(buf)
348 except ValueError:
349 self.fail("readinto() after next() with supposedly empty "
350 "iteration-buffer failed anyway")
351 line = buf.tostring()
352 if line != testline:
353 self.fail("readinto() after next() with empty buffer "
354 "failed. Got %r, expected %r" % (line, testline))
355
356 testline = testlines.pop(0)
357 try:
358 line = f.read(len(testline))
359 except ValueError:
360 self.fail("read() after next() with supposedly empty "
361 "iteration-buffer failed anyway")
362 if line != testline:
363 self.fail("read() after next() with empty buffer "
364 "failed. Got %r, expected %r" % (line, testline))
365 try:
366 lines = f.readlines()
367 except ValueError:
368 self.fail("readlines() after next() with supposedly empty "
369 "iteration-buffer failed anyway")
370 if lines != testlines:
371 self.fail("readlines() after next() with empty buffer "
372 "failed. Got %r, expected %r" % (line, testline))
373 # Reading after iteration hit EOF shouldn't hurt either
374 f = open(TESTFN)
375 try:
376 for line in f:
377 pass
378 try:
379 f.readline()
380 f.readinto(buf)
381 f.read()
382 f.readlines()
383 except ValueError:
384 self.fail("read* failed after next() consumed file")
385 finally:
386 f.close()
387 finally:
388 os.unlink(TESTFN)
389
Georg Brandlad61bc82008-02-23 15:11:18 +0000390class FileSubclassTests(unittest.TestCase):
391
392 def testExit(self):
393 # test that exiting with context calls subclass' close
394 class C(file):
395 def __init__(self, *args):
396 self.subclass_closed = False
397 file.__init__(self, *args)
398 def close(self):
399 self.subclass_closed = True
400 file.close(self)
401
402 with C(TESTFN, 'w') as f:
403 pass
404 self.failUnless(f.subclass_closed)
405
Georg Brandl442b49e2006-06-08 14:50:53 +0000406
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000407class FileThreadingTests(unittest.TestCase):
408 # These tests check the ability to call various methods of file objects
409 # (including close()) concurrently without crashing the Python interpreter.
410 # See #815646, #595601
411
412 def setUp(self):
413 self.f = None
414 self.filename = TESTFN
415 with open(self.filename, "w") as f:
416 f.write("\n".join("0123456789"))
417 self._count_lock = threading.Lock()
418 self.close_count = 0
419 self.close_success_count = 0
420
421 def tearDown(self):
422 if self.f:
423 try:
424 self.f.close()
425 except (EnvironmentError, ValueError):
426 pass
427 try:
428 os.remove(self.filename)
429 except EnvironmentError:
430 pass
431
432 def _create_file(self):
433 self.f = open(self.filename, "w+")
434
435 def _close_file(self):
436 with self._count_lock:
437 self.close_count += 1
438 self.f.close()
439 with self._count_lock:
440 self.close_success_count += 1
441
442 def _close_and_reopen_file(self):
443 self._close_file()
444 # if close raises an exception thats fine, self.f remains valid so
445 # we don't need to reopen.
446 self._create_file()
447
448 def _run_workers(self, func, nb_workers, duration=0.2):
449 with self._count_lock:
450 self.close_count = 0
451 self.close_success_count = 0
452 self.do_continue = True
453 threads = []
454 try:
455 for i in range(nb_workers):
456 t = threading.Thread(target=func)
457 t.start()
458 threads.append(t)
459 for _ in xrange(100):
460 time.sleep(duration/100)
461 with self._count_lock:
462 if self.close_count-self.close_success_count > nb_workers+1:
463 if test_support.verbose:
464 print 'Q',
465 break
466 time.sleep(duration)
467 finally:
468 self.do_continue = False
469 for t in threads:
470 t.join()
471
472 def _test_close_open_io(self, io_func, nb_workers=5):
473 def worker():
474 self._create_file()
475 funcs = itertools.cycle((
476 lambda: io_func(),
477 lambda: self._close_and_reopen_file(),
478 ))
479 for f in funcs:
480 if not self.do_continue:
481 break
482 try:
483 f()
484 except (IOError, ValueError):
485 pass
486 self._run_workers(worker, nb_workers)
487 if test_support.verbose:
488 # Useful verbose statistics when tuning this test to take
489 # less time to run but still ensuring that its still useful.
490 #
491 # the percent of close calls that raised an error
492 percent = 100. - 100.*self.close_success_count/self.close_count
493 print self.close_count, ('%.4f ' % percent),
494
495 def test_close_open(self):
496 def io_func():
497 pass
498 self._test_close_open_io(io_func)
499
500 def test_close_open_flush(self):
501 def io_func():
502 self.f.flush()
503 self._test_close_open_io(io_func)
504
505 def test_close_open_iter(self):
506 def io_func():
507 list(iter(self.f))
508 self._test_close_open_io(io_func)
509
510 def test_close_open_isatty(self):
511 def io_func():
512 self.f.isatty()
513 self._test_close_open_io(io_func)
514
515 def test_close_open_print(self):
516 def io_func():
517 print >> self.f, ''
518 self._test_close_open_io(io_func)
519
520 def test_close_open_read(self):
521 def io_func():
522 self.f.read(0)
523 self._test_close_open_io(io_func)
524
525 def test_close_open_readinto(self):
526 def io_func():
527 a = array('c', 'xxxxx')
528 self.f.readinto(a)
529 self._test_close_open_io(io_func)
530
531 def test_close_open_readline(self):
532 def io_func():
533 self.f.readline()
534 self._test_close_open_io(io_func)
535
536 def test_close_open_readlines(self):
537 def io_func():
538 self.f.readlines()
539 self._test_close_open_io(io_func)
540
541 def test_close_open_seek(self):
542 def io_func():
543 self.f.seek(0, 0)
544 self._test_close_open_io(io_func)
545
546 def test_close_open_tell(self):
547 def io_func():
548 self.f.tell()
549 self._test_close_open_io(io_func)
550
551 def test_close_open_truncate(self):
552 def io_func():
553 self.f.truncate()
554 self._test_close_open_io(io_func)
555
556 def test_close_open_write(self):
557 def io_func():
558 self.f.write('')
559 self._test_close_open_io(io_func)
560
561 def test_close_open_writelines(self):
562 def io_func():
563 self.f.writelines('')
564 self._test_close_open_io(io_func)
565
566
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000567class StdoutTests(unittest.TestCase):
568
569 def test_move_stdout_on_write(self):
570 # Issue 3242: sys.stdout can be replaced (and freed) during a
571 # print statement; prevent a segfault in this case
572 save_stdout = sys.stdout
573
574 class File:
575 def write(self, data):
576 if '\n' in data:
577 sys.stdout = save_stdout
578
579 try:
580 sys.stdout = File()
581 print "some text"
582 finally:
583 sys.stdout = save_stdout
584
Jeffrey Yasskin69614982008-12-11 05:21:18 +0000585 def test_del_stdout_before_print(self):
586 # Issue 4597: 'print' with no argument wasn't reporting when
587 # sys.stdout was deleted.
588 save_stdout = sys.stdout
589 del sys.stdout
590 try:
591 print
592 except RuntimeError as e:
593 self.assertEquals(str(e), "lost sys.stdout")
594 else:
595 self.fail("Expected RuntimeError")
596 finally:
597 sys.stdout = save_stdout
598
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000599
Georg Brandl442b49e2006-06-08 14:50:53 +0000600def test_main():
Neal Norwitzc9778a82006-06-09 05:54:18 +0000601 # Historically, these tests have been sloppy about removing TESTFN.
602 # So get rid of it no matter what.
Tim Peters0556e9b2006-06-09 04:02:06 +0000603 try:
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000604 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000605 FileThreadingTests, StdoutTests)
Tim Peters0556e9b2006-06-09 04:02:06 +0000606 finally:
607 if os.path.exists(TESTFN):
608 os.unlink(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +0000609
610if __name__ == '__main__':
611 test_main()