blob: a134a89c0571db1572d95bbad7b5623ee332a95d [file] [log] [blame]
Martin v. Löwisf90ae202002-06-11 06:22:31 +00001import sys
Fred Drake2ec80fa2000-10-23 16:59:35 +00002import os
Georg Brandl442b49e2006-06-08 14:50:53 +00003import unittest
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +00004import itertools
5import time
6import threading
Neal Norwitz62f5a9d2002-04-01 00:09:00 +00007from array import array
Raymond Hettingercb87bc82004-05-31 00:35:52 +00008from weakref import proxy
Fred Drake2ec80fa2000-10-23 16:59:35 +00009
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +000010from test import test_support
Georg Brandl442b49e2006-06-08 14:50:53 +000011from test.test_support import TESTFN, findfile, run_unittest
Marc-André Lemburgfa44d792000-08-25 22:37:31 +000012from UserList import UserList
13
Georg Brandl442b49e2006-06-08 14:50:53 +000014class AutoFileTests(unittest.TestCase):
15 # file tests for which a test file is automatically set up
Raymond Hettingercb87bc82004-05-31 00:35:52 +000016
Georg Brandl442b49e2006-06-08 14:50:53 +000017 def setUp(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000018 self.f = open(TESTFN, 'wb')
Tim Peters015dd822003-05-04 04:16:52 +000019
Georg Brandl442b49e2006-06-08 14:50:53 +000020 def tearDown(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000021 if self.f:
22 self.f.close()
23 os.remove(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +000024
25 def testWeakRefs(self):
26 # verify weak references
27 p = proxy(self.f)
28 p.write('teststring')
29 self.assertEquals(self.f.tell(), p.tell())
30 self.f.close()
31 self.f = None
32 self.assertRaises(ReferenceError, getattr, p, 'tell')
33
34 def testAttributes(self):
35 # verify expected attributes exist
36 f = self.f
37 softspace = f.softspace
38 f.name # merely shouldn't blow up
39 f.mode # ditto
40 f.closed # ditto
41
42 # verify softspace is writable
43 f.softspace = softspace # merely shouldn't blow up
44
45 # verify the others aren't
46 for attr in 'name', 'mode', 'closed':
47 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
48
49 def testReadinto(self):
50 # verify readinto
51 self.f.write('12')
52 self.f.close()
53 a = array('c', 'x'*10)
54 self.f = open(TESTFN, 'rb')
55 n = self.f.readinto(a)
56 self.assertEquals('12', a.tostring()[:n])
57
58 def testWritelinesUserList(self):
59 # verify writelines with instance sequence
60 l = UserList(['1', '2'])
61 self.f.writelines(l)
62 self.f.close()
63 self.f = open(TESTFN, 'rb')
64 buf = self.f.read()
65 self.assertEquals(buf, '12')
66
67 def testWritelinesIntegers(self):
68 # verify writelines with integers
69 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
70
71 def testWritelinesIntegersUserList(self):
72 # verify writelines with integers in UserList
73 l = UserList([1,2,3])
74 self.assertRaises(TypeError, self.f.writelines, l)
75
76 def testWritelinesNonString(self):
77 # verify writelines with non-string object
Tim Petersdbb82f62006-06-09 03:51:41 +000078 class NonString:
79 pass
Georg Brandl442b49e2006-06-08 14:50:53 +000080
Tim Petersdbb82f62006-06-09 03:51:41 +000081 self.assertRaises(TypeError, self.f.writelines,
82 [NonString(), NonString()])
Georg Brandl442b49e2006-06-08 14:50:53 +000083
84 def testRepr(self):
85 # verify repr works
86 self.assert_(repr(self.f).startswith("<open file '" + TESTFN))
87
88 def testErrors(self):
89 f = self.f
90 self.assertEquals(f.name, TESTFN)
91 self.assert_(not f.isatty())
92 self.assert_(not f.closed)
Tim Peters520d8dd2006-06-09 02:11:02 +000093
Georg Brandl442b49e2006-06-08 14:50:53 +000094 self.assertRaises(TypeError, f.readinto, "")
95 f.close()
96 self.assert_(f.closed)
97
98 def testMethods(self):
99 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
Tim Petersdbb82f62006-06-09 03:51:41 +0000100 'readline', 'readlines', 'seek', 'tell', 'truncate',
101 'write', 'xreadlines', '__iter__']
Georg Brandl442b49e2006-06-08 14:50:53 +0000102 if sys.platform.startswith('atheos'):
103 methods.remove('truncate')
104
Georg Brandle7ec81f2006-06-09 18:29:52 +0000105 # __exit__ should close the file
106 self.f.__exit__(None, None, None)
107 self.assert_(self.f.closed)
Georg Brandl442b49e2006-06-08 14:50:53 +0000108
109 for methodname in methods:
110 method = getattr(self.f, methodname)
111 # should raise on closed file
112 self.assertRaises(ValueError, method)
113 self.assertRaises(ValueError, self.f.writelines, [])
114
Georg Brandle7ec81f2006-06-09 18:29:52 +0000115 # file is closed, __exit__ shouldn't do anything
116 self.assertEquals(self.f.__exit__(None, None, None), None)
117 # it must also return None if an exception was given
118 try:
119 1/0
120 except:
121 self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
122
Skip Montanarof205c132008-12-23 03:30:15 +0000123 def testReadWhenWriting(self):
124 self.assertRaises(IOError, self.f.read)
Georg Brandl442b49e2006-06-08 14:50:53 +0000125
126class OtherFileTests(unittest.TestCase):
127
Benjamin Petersonfe231b02008-12-29 17:47:42 +0000128 def testOpenDir(self):
129 this_dir = os.path.dirname(__file__)
130 for mode in (None, "w"):
131 try:
132 if mode:
133 f = open(this_dir, mode)
134 else:
135 f = open(this_dir)
136 except IOError as e:
137 self.assertEqual(e.filename, this_dir)
138 else:
139 self.fail("opening a directory didn't raise an IOError")
140
Georg Brandl442b49e2006-06-08 14:50:53 +0000141 def testModeStrings(self):
142 # check invalid mode strings
143 for mode in ("", "aU", "wU+"):
144 try:
Tim Petersdbb82f62006-06-09 03:51:41 +0000145 f = open(TESTFN, mode)
Georg Brandl442b49e2006-06-08 14:50:53 +0000146 except ValueError:
147 pass
148 else:
149 f.close()
150 self.fail('%r is an invalid file mode' % mode)
151
Amaury Forgeot d'Arc17617a02008-09-25 20:52:56 +0000152 # Some invalid modes fail on Windows, but pass on Unix
153 # Issue3965: avoid a crash on Windows when filename is unicode
154 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
155 try:
156 f = open(name, "rr")
Kristján Valur Jónssonfd4c8722009-02-04 10:05:25 +0000157 except (IOError, ValueError):
Amaury Forgeot d'Arc17617a02008-09-25 20:52:56 +0000158 pass
159 else:
160 f.close()
161
Georg Brandl442b49e2006-06-08 14:50:53 +0000162 def testStdin(self):
163 # This causes the interpreter to exit on OSF1 v5.1.
164 if sys.platform != 'osf1V5':
165 self.assertRaises(IOError, sys.stdin.seek, -1)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000166 else:
Georg Brandl442b49e2006-06-08 14:50:53 +0000167 print >>sys.__stdout__, (
168 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
169 ' Test manually.')
170 self.assertRaises(IOError, sys.stdin.truncate)
171
172 def testUnicodeOpen(self):
173 # verify repr works for unicode too
174 f = open(unicode(TESTFN), "w")
175 self.assert_(repr(f).startswith("<open file u'" + TESTFN))
Thomas Woutersc45251a2006-02-12 11:53:32 +0000176 f.close()
Tim Peters0556e9b2006-06-09 04:02:06 +0000177 os.unlink(TESTFN)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000178
Georg Brandl442b49e2006-06-08 14:50:53 +0000179 def testBadModeArgument(self):
180 # verify that we get a sensible error message for bad mode argument
181 bad_mode = "qwerty"
Tim Peterscffcfed2006-02-14 17:41:18 +0000182 try:
Georg Brandl442b49e2006-06-08 14:50:53 +0000183 f = open(TESTFN, bad_mode)
184 except ValueError, msg:
185 if msg[0] != 0:
186 s = str(msg)
187 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:
188 self.fail("bad error message for invalid mode: %s" % s)
189 # if msg[0] == 0, we're probably on Windows where there may be
190 # no obvious way to discover why open() failed.
191 else:
192 f.close()
193 self.fail("no error for invalid mode: %s" % bad_mode)
194
195 def testSetBufferSize(self):
196 # make sure that explicitly setting the buffer size doesn't cause
197 # misbehaviour especially with repeated close() calls
198 for s in (-1, 0, 1, 512):
199 try:
200 f = open(TESTFN, 'w', s)
201 f.write(str(s))
202 f.close()
203 f.close()
204 f = open(TESTFN, 'r', s)
205 d = int(f.read())
206 f.close()
207 f.close()
208 except IOError, msg:
209 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
210 self.assertEquals(d, s)
211
212 def testTruncateOnWindows(self):
213 os.unlink(TESTFN)
214
215 def bug801631():
216 # SF bug <http://www.python.org/sf/801631>
217 # "file.truncate fault on windows"
Tim Petersdbb82f62006-06-09 03:51:41 +0000218 f = open(TESTFN, 'wb')
Georg Brandl442b49e2006-06-08 14:50:53 +0000219 f.write('12345678901') # 11 bytes
220 f.close()
221
Tim Petersdbb82f62006-06-09 03:51:41 +0000222 f = open(TESTFN,'rb+')
Georg Brandl442b49e2006-06-08 14:50:53 +0000223 data = f.read(5)
224 if data != '12345':
225 self.fail("Read on file opened for update failed %r" % data)
226 if f.tell() != 5:
227 self.fail("File pos after read wrong %d" % f.tell())
228
229 f.truncate()
230 if f.tell() != 5:
231 self.fail("File pos after ftruncate wrong %d" % f.tell())
232
233 f.close()
234 size = os.path.getsize(TESTFN)
235 if size != 5:
236 self.fail("File size after ftruncate wrong %d" % size)
237
238 try:
239 bug801631()
240 finally:
241 os.unlink(TESTFN)
242
243 def testIteration(self):
Tim Petersdbb82f62006-06-09 03:51:41 +0000244 # Test the complex interaction when mixing file-iteration and the
245 # various read* methods. Ostensibly, the mixture could just be tested
246 # to work when it should work according to the Python language,
247 # instead of fail when it should fail according to the current CPython
248 # implementation. People don't always program Python the way they
249 # should, though, and the implemenation might change in subtle ways,
250 # so we explicitly test for errors, too; the test will just have to
251 # be updated when the implementation changes.
Georg Brandl442b49e2006-06-08 14:50:53 +0000252 dataoffset = 16384
253 filler = "ham\n"
254 assert not dataoffset % len(filler), \
255 "dataoffset must be multiple of len(filler)"
256 nchunks = dataoffset // len(filler)
257 testlines = [
258 "spam, spam and eggs\n",
259 "eggs, spam, ham and spam\n",
260 "saussages, spam, spam and eggs\n",
261 "spam, ham, spam and eggs\n",
262 "spam, spam, spam, spam, spam, ham, spam\n",
263 "wonderful spaaaaaam.\n"
264 ]
265 methods = [("readline", ()), ("read", ()), ("readlines", ()),
266 ("readinto", (array("c", " "*100),))]
267
268 try:
269 # Prepare the testfile
270 bag = open(TESTFN, "w")
271 bag.write(filler * nchunks)
272 bag.writelines(testlines)
273 bag.close()
274 # Test for appropriate errors mixing read* and iteration
275 for methodname, args in methods:
276 f = open(TESTFN)
277 if f.next() != filler:
278 self.fail, "Broken testfile"
279 meth = getattr(f, methodname)
280 try:
281 meth(*args)
282 except ValueError:
283 pass
284 else:
285 self.fail("%s%r after next() didn't raise ValueError" %
286 (methodname, args))
287 f.close()
288
Tim Petersdbb82f62006-06-09 03:51:41 +0000289 # Test to see if harmless (by accident) mixing of read* and
290 # iteration still works. This depends on the size of the internal
291 # iteration buffer (currently 8192,) but we can test it in a
292 # flexible manner. Each line in the bag o' ham is 4 bytes
293 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
294 # exactly on the buffer boundary for any power-of-2 buffersize
295 # between 4 and 16384 (inclusive).
Georg Brandl442b49e2006-06-08 14:50:53 +0000296 f = open(TESTFN)
297 for i in range(nchunks):
298 f.next()
299 testline = testlines.pop(0)
300 try:
301 line = f.readline()
302 except ValueError:
303 self.fail("readline() after next() with supposedly empty "
304 "iteration-buffer failed anyway")
305 if line != testline:
306 self.fail("readline() after next() with empty buffer "
307 "failed. Got %r, expected %r" % (line, testline))
308 testline = testlines.pop(0)
309 buf = array("c", "\x00" * len(testline))
310 try:
311 f.readinto(buf)
312 except ValueError:
313 self.fail("readinto() after next() with supposedly empty "
314 "iteration-buffer failed anyway")
315 line = buf.tostring()
316 if line != testline:
317 self.fail("readinto() after next() with empty buffer "
318 "failed. Got %r, expected %r" % (line, testline))
319
320 testline = testlines.pop(0)
321 try:
322 line = f.read(len(testline))
323 except ValueError:
324 self.fail("read() after next() with supposedly empty "
325 "iteration-buffer failed anyway")
326 if line != testline:
327 self.fail("read() after next() with empty buffer "
328 "failed. Got %r, expected %r" % (line, testline))
329 try:
330 lines = f.readlines()
331 except ValueError:
332 self.fail("readlines() after next() with supposedly empty "
333 "iteration-buffer failed anyway")
334 if lines != testlines:
335 self.fail("readlines() after next() with empty buffer "
336 "failed. Got %r, expected %r" % (line, testline))
337 # Reading after iteration hit EOF shouldn't hurt either
338 f = open(TESTFN)
339 try:
340 for line in f:
341 pass
342 try:
343 f.readline()
344 f.readinto(buf)
345 f.read()
346 f.readlines()
347 except ValueError:
348 self.fail("read* failed after next() consumed file")
349 finally:
350 f.close()
351 finally:
352 os.unlink(TESTFN)
353
Georg Brandlad61bc82008-02-23 15:11:18 +0000354class FileSubclassTests(unittest.TestCase):
355
356 def testExit(self):
357 # test that exiting with context calls subclass' close
358 class C(file):
359 def __init__(self, *args):
360 self.subclass_closed = False
361 file.__init__(self, *args)
362 def close(self):
363 self.subclass_closed = True
364 file.close(self)
365
366 with C(TESTFN, 'w') as f:
367 pass
368 self.failUnless(f.subclass_closed)
369
Georg Brandl442b49e2006-06-08 14:50:53 +0000370
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000371class FileThreadingTests(unittest.TestCase):
372 # These tests check the ability to call various methods of file objects
373 # (including close()) concurrently without crashing the Python interpreter.
374 # See #815646, #595601
375
376 def setUp(self):
377 self.f = None
378 self.filename = TESTFN
379 with open(self.filename, "w") as f:
380 f.write("\n".join("0123456789"))
381 self._count_lock = threading.Lock()
382 self.close_count = 0
383 self.close_success_count = 0
384
385 def tearDown(self):
386 if self.f:
387 try:
388 self.f.close()
389 except (EnvironmentError, ValueError):
390 pass
391 try:
392 os.remove(self.filename)
393 except EnvironmentError:
394 pass
395
396 def _create_file(self):
397 self.f = open(self.filename, "w+")
398
399 def _close_file(self):
400 with self._count_lock:
401 self.close_count += 1
402 self.f.close()
403 with self._count_lock:
404 self.close_success_count += 1
405
406 def _close_and_reopen_file(self):
407 self._close_file()
408 # if close raises an exception thats fine, self.f remains valid so
409 # we don't need to reopen.
410 self._create_file()
411
412 def _run_workers(self, func, nb_workers, duration=0.2):
413 with self._count_lock:
414 self.close_count = 0
415 self.close_success_count = 0
416 self.do_continue = True
417 threads = []
418 try:
419 for i in range(nb_workers):
420 t = threading.Thread(target=func)
421 t.start()
422 threads.append(t)
423 for _ in xrange(100):
424 time.sleep(duration/100)
425 with self._count_lock:
426 if self.close_count-self.close_success_count > nb_workers+1:
427 if test_support.verbose:
428 print 'Q',
429 break
430 time.sleep(duration)
431 finally:
432 self.do_continue = False
433 for t in threads:
434 t.join()
435
436 def _test_close_open_io(self, io_func, nb_workers=5):
437 def worker():
438 self._create_file()
439 funcs = itertools.cycle((
440 lambda: io_func(),
441 lambda: self._close_and_reopen_file(),
442 ))
443 for f in funcs:
444 if not self.do_continue:
445 break
446 try:
447 f()
448 except (IOError, ValueError):
449 pass
450 self._run_workers(worker, nb_workers)
451 if test_support.verbose:
452 # Useful verbose statistics when tuning this test to take
453 # less time to run but still ensuring that its still useful.
454 #
455 # the percent of close calls that raised an error
456 percent = 100. - 100.*self.close_success_count/self.close_count
457 print self.close_count, ('%.4f ' % percent),
458
459 def test_close_open(self):
460 def io_func():
461 pass
462 self._test_close_open_io(io_func)
463
464 def test_close_open_flush(self):
465 def io_func():
466 self.f.flush()
467 self._test_close_open_io(io_func)
468
469 def test_close_open_iter(self):
470 def io_func():
471 list(iter(self.f))
472 self._test_close_open_io(io_func)
473
474 def test_close_open_isatty(self):
475 def io_func():
476 self.f.isatty()
477 self._test_close_open_io(io_func)
478
479 def test_close_open_print(self):
480 def io_func():
481 print >> self.f, ''
482 self._test_close_open_io(io_func)
483
484 def test_close_open_read(self):
485 def io_func():
486 self.f.read(0)
487 self._test_close_open_io(io_func)
488
489 def test_close_open_readinto(self):
490 def io_func():
491 a = array('c', 'xxxxx')
492 self.f.readinto(a)
493 self._test_close_open_io(io_func)
494
495 def test_close_open_readline(self):
496 def io_func():
497 self.f.readline()
498 self._test_close_open_io(io_func)
499
500 def test_close_open_readlines(self):
501 def io_func():
502 self.f.readlines()
503 self._test_close_open_io(io_func)
504
505 def test_close_open_seek(self):
506 def io_func():
507 self.f.seek(0, 0)
508 self._test_close_open_io(io_func)
509
510 def test_close_open_tell(self):
511 def io_func():
512 self.f.tell()
513 self._test_close_open_io(io_func)
514
515 def test_close_open_truncate(self):
516 def io_func():
517 self.f.truncate()
518 self._test_close_open_io(io_func)
519
520 def test_close_open_write(self):
521 def io_func():
522 self.f.write('')
523 self._test_close_open_io(io_func)
524
525 def test_close_open_writelines(self):
526 def io_func():
527 self.f.writelines('')
528 self._test_close_open_io(io_func)
529
530
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000531class StdoutTests(unittest.TestCase):
532
533 def test_move_stdout_on_write(self):
534 # Issue 3242: sys.stdout can be replaced (and freed) during a
535 # print statement; prevent a segfault in this case
536 save_stdout = sys.stdout
537
538 class File:
539 def write(self, data):
540 if '\n' in data:
541 sys.stdout = save_stdout
542
543 try:
544 sys.stdout = File()
545 print "some text"
546 finally:
547 sys.stdout = save_stdout
548
Jeffrey Yasskin2d873bd2008-12-08 18:55:24 +0000549 def test_del_stdout_before_print(self):
550 # Issue 4597: 'print' with no argument wasn't reporting when
551 # sys.stdout was deleted.
552 save_stdout = sys.stdout
553 del sys.stdout
554 try:
555 print
556 except RuntimeError as e:
557 self.assertEquals(str(e), "lost sys.stdout")
558 else:
559 self.fail("Expected RuntimeError")
560 finally:
561 sys.stdout = save_stdout
562
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000563
Georg Brandl442b49e2006-06-08 14:50:53 +0000564def test_main():
Neal Norwitzc9778a82006-06-09 05:54:18 +0000565 # Historically, these tests have been sloppy about removing TESTFN.
566 # So get rid of it no matter what.
Tim Peters0556e9b2006-06-09 04:02:06 +0000567 try:
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000568 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000569 FileThreadingTests, StdoutTests)
Tim Peters0556e9b2006-06-09 04:02:06 +0000570 finally:
571 if os.path.exists(TESTFN):
572 os.unlink(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +0000573
574if __name__ == '__main__':
575 test_main()