blob: a134a89c0571db1572d95bbad7b5623ee332a95d [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
5import time
6import threading
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00007from array import array
8from weakref import proxy
9
Antoine Pitrou47a5f482009-06-12 20:41:52 +000010from test import test_support
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000011from test.test_support import TESTFN, findfile, run_unittest
12from UserList import UserList
13
14class AutoFileTests(unittest.TestCase):
15 # file tests for which a test file is automatically set up
16
17 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000018 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000019
20 def tearDown(self):
21 if self.f:
22 self.f.close()
23 os.remove(TESTFN)
24
25 def testWeakRefs(self):
26 # verify weak references
27 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000028 p.write('teststring')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000029 self.assertEquals(self.f.tell(), p.tell())
30 self.f.close()
31 self.f = None
32 self.assertRaises(ReferenceError, getattr, p, 'tell')
33
34 def testAttributes(self):
35 # verify expected attributes exist
36 f = self.f
Antoine Pitrou47a5f482009-06-12 20:41:52 +000037 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000038 f.name # merely shouldn't blow up
39 f.mode # ditto
40 f.closed # ditto
41
Antoine Pitrou47a5f482009-06-12 20:41:52 +000042 # verify softspace is writable
43 f.softspace = softspace # merely shouldn't blow up
44
45 # verify the others aren't
46 for attr in 'name', 'mode', 'closed':
47 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
48
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000049 def testReadinto(self):
50 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000051 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000052 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000053 a = array('c', 'x'*10)
54 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000055 n = self.f.readinto(a)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000056 self.assertEquals('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000057
58 def testWritelinesUserList(self):
59 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000060 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000061 self.f.writelines(l)
62 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000063 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000064 buf = self.f.read()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000065 self.assertEquals(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000066
67 def testWritelinesIntegers(self):
68 # verify writelines with integers
69 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
70
71 def testWritelinesIntegersUserList(self):
72 # verify writelines with integers in UserList
73 l = UserList([1,2,3])
74 self.assertRaises(TypeError, self.f.writelines, l)
75
76 def testWritelinesNonString(self):
77 # verify writelines with non-string object
78 class NonString:
79 pass
80
81 self.assertRaises(TypeError, self.f.writelines,
82 [NonString(), NonString()])
83
Antoine Pitrou47a5f482009-06-12 20:41:52 +000084 def testRepr(self):
85 # verify repr works
86 self.assert_(repr(self.f).startswith("<open file '" + TESTFN))
87
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000088 def testErrors(self):
89 f = self.f
90 self.assertEquals(f.name, TESTFN)
91 self.assert_(not f.isatty())
92 self.assert_(not f.closed)
93
Antoine Pitrou47a5f482009-06-12 20:41:52 +000094 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000095 f.close()
96 self.assert_(f.closed)
97
98 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000099 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
100 'readline', 'readlines', 'seek', 'tell', 'truncate',
101 'write', 'xreadlines', '__iter__']
102 if sys.platform.startswith('atheos'):
103 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000104
105 # __exit__ should close the file
106 self.f.__exit__(None, None, None)
107 self.assert_(self.f.closed)
108
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000109 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000110 method = getattr(self.f, methodname)
111 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000112 self.assertRaises(ValueError, method)
113 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000114
115 # file is closed, __exit__ shouldn't do anything
116 self.assertEquals(self.f.__exit__(None, None, None), None)
117 # it must also return None if an exception was given
118 try:
119 1/0
120 except:
121 self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
122
123 def testReadWhenWriting(self):
124 self.assertRaises(IOError, self.f.read)
125
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000126class OtherFileTests(unittest.TestCase):
127
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000128 def testOpenDir(self):
129 this_dir = os.path.dirname(__file__)
130 for mode in (None, "w"):
131 try:
132 if mode:
133 f = open(this_dir, mode)
134 else:
135 f = open(this_dir)
136 except IOError as e:
137 self.assertEqual(e.filename, this_dir)
138 else:
139 self.fail("opening a directory didn't raise an IOError")
140
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000141 def testModeStrings(self):
142 # check invalid mode strings
143 for mode in ("", "aU", "wU+"):
144 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000145 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000146 except ValueError:
147 pass
148 else:
149 f.close()
150 self.fail('%r is an invalid file mode' % mode)
151
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000152 # Some invalid modes fail on Windows, but pass on Unix
153 # Issue3965: avoid a crash on Windows when filename is unicode
154 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
155 try:
156 f = open(name, "rr")
157 except (IOError, ValueError):
158 pass
159 else:
160 f.close()
161
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000162 def testStdin(self):
163 # This causes the interpreter to exit on OSF1 v5.1.
164 if sys.platform != 'osf1V5':
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000165 self.assertRaises(IOError, sys.stdin.seek, -1)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000166 else:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000167 print >>sys.__stdout__, (
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000168 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000169 ' Test manually.')
170 self.assertRaises(IOError, sys.stdin.truncate)
171
172 def testUnicodeOpen(self):
173 # verify repr works for unicode too
174 f = open(unicode(TESTFN), "w")
175 self.assert_(repr(f).startswith("<open file u'" + TESTFN))
176 f.close()
177 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000178
179 def testBadModeArgument(self):
180 # verify that we get a sensible error message for bad mode argument
181 bad_mode = "qwerty"
182 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000183 f = open(TESTFN, bad_mode)
184 except ValueError, msg:
185 if msg[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000186 s = str(msg)
187 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:
188 self.fail("bad error message for invalid mode: %s" % s)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000189 # if msg[0] == 0, we're probably on Windows where there may be
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000190 # no obvious way to discover why open() failed.
191 else:
192 f.close()
193 self.fail("no error for invalid mode: %s" % bad_mode)
194
195 def testSetBufferSize(self):
196 # make sure that explicitly setting the buffer size doesn't cause
197 # misbehaviour especially with repeated close() calls
198 for s in (-1, 0, 1, 512):
199 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000200 f = open(TESTFN, 'w', s)
201 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000202 f.close()
203 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000204 f = open(TESTFN, 'r', s)
205 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000206 f.close()
207 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000208 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000209 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
210 self.assertEquals(d, s)
211
212 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000213 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000214
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000215 def bug801631():
216 # SF bug <http://www.python.org/sf/801631>
217 # "file.truncate fault on windows"
218 f = open(TESTFN, 'wb')
219 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000220 f.close()
221
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000222 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000223 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000224 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000225 self.fail("Read on file opened for update failed %r" % data)
226 if f.tell() != 5:
227 self.fail("File pos after read wrong %d" % f.tell())
228
229 f.truncate()
230 if f.tell() != 5:
231 self.fail("File pos after ftruncate wrong %d" % f.tell())
232
233 f.close()
234 size = os.path.getsize(TESTFN)
235 if size != 5:
236 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000237
238 try:
239 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000240 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000241 os.unlink(TESTFN)
242
243 def testIteration(self):
244 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000245 # various read* methods. Ostensibly, the mixture could just be tested
246 # to work when it should work according to the Python language,
247 # instead of fail when it should fail according to the current CPython
248 # implementation. People don't always program Python the way they
249 # should, though, and the implemenation might change in subtle ways,
250 # so we explicitly test for errors, too; the test will just have to
251 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000252 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000253 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000254 assert not dataoffset % len(filler), \
255 "dataoffset must be multiple of len(filler)"
256 nchunks = dataoffset // len(filler)
257 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000258 "spam, spam and eggs\n",
259 "eggs, spam, ham and spam\n",
260 "saussages, spam, spam and eggs\n",
261 "spam, ham, spam and eggs\n",
262 "spam, spam, spam, spam, spam, ham, spam\n",
263 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000264 ]
265 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000266 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000267
268 try:
269 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000270 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000271 bag.write(filler * nchunks)
272 bag.writelines(testlines)
273 bag.close()
274 # Test for appropriate errors mixing read* and iteration
275 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000276 f = open(TESTFN)
277 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000278 self.fail, "Broken testfile"
279 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000280 try:
281 meth(*args)
282 except ValueError:
283 pass
284 else:
285 self.fail("%s%r after next() didn't raise ValueError" %
286 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000287 f.close()
288
289 # Test to see if harmless (by accident) mixing of read* and
290 # iteration still works. This depends on the size of the internal
291 # iteration buffer (currently 8192,) but we can test it in a
292 # flexible manner. Each line in the bag o' ham is 4 bytes
293 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
294 # exactly on the buffer boundary for any power-of-2 buffersize
295 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000296 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000297 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000298 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000299 testline = testlines.pop(0)
300 try:
301 line = f.readline()
302 except ValueError:
303 self.fail("readline() after next() with supposedly empty "
304 "iteration-buffer failed anyway")
305 if line != testline:
306 self.fail("readline() after next() with empty buffer "
307 "failed. Got %r, expected %r" % (line, testline))
308 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000309 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000310 try:
311 f.readinto(buf)
312 except ValueError:
313 self.fail("readinto() after next() with supposedly empty "
314 "iteration-buffer failed anyway")
315 line = buf.tostring()
316 if line != testline:
317 self.fail("readinto() after next() with empty buffer "
318 "failed. Got %r, expected %r" % (line, testline))
319
320 testline = testlines.pop(0)
321 try:
322 line = f.read(len(testline))
323 except ValueError:
324 self.fail("read() after next() with supposedly empty "
325 "iteration-buffer failed anyway")
326 if line != testline:
327 self.fail("read() after next() with empty buffer "
328 "failed. Got %r, expected %r" % (line, testline))
329 try:
330 lines = f.readlines()
331 except ValueError:
332 self.fail("readlines() after next() with supposedly empty "
333 "iteration-buffer failed anyway")
334 if lines != testlines:
335 self.fail("readlines() after next() with empty buffer "
336 "failed. Got %r, expected %r" % (line, testline))
337 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000338 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000339 try:
340 for line in f:
341 pass
342 try:
343 f.readline()
344 f.readinto(buf)
345 f.read()
346 f.readlines()
347 except ValueError:
348 self.fail("read* failed after next() consumed file")
349 finally:
350 f.close()
351 finally:
352 os.unlink(TESTFN)
353
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000354class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000355
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000356 def testExit(self):
357 # test that exiting with context calls subclass' close
358 class C(file):
359 def __init__(self, *args):
360 self.subclass_closed = False
361 file.__init__(self, *args)
362 def close(self):
363 self.subclass_closed = True
364 file.close(self)
365
366 with C(TESTFN, 'w') as f:
367 pass
368 self.failUnless(f.subclass_closed)
369
370
371class FileThreadingTests(unittest.TestCase):
372 # These tests check the ability to call various methods of file objects
373 # (including close()) concurrently without crashing the Python interpreter.
374 # See #815646, #595601
375
376 def setUp(self):
377 self.f = None
378 self.filename = TESTFN
379 with open(self.filename, "w") as f:
380 f.write("\n".join("0123456789"))
381 self._count_lock = threading.Lock()
382 self.close_count = 0
383 self.close_success_count = 0
384
385 def tearDown(self):
386 if self.f:
387 try:
388 self.f.close()
389 except (EnvironmentError, ValueError):
390 pass
391 try:
392 os.remove(self.filename)
393 except EnvironmentError:
394 pass
395
396 def _create_file(self):
397 self.f = open(self.filename, "w+")
398
399 def _close_file(self):
400 with self._count_lock:
401 self.close_count += 1
402 self.f.close()
403 with self._count_lock:
404 self.close_success_count += 1
405
406 def _close_and_reopen_file(self):
407 self._close_file()
408 # if close raises an exception thats fine, self.f remains valid so
409 # we don't need to reopen.
410 self._create_file()
411
412 def _run_workers(self, func, nb_workers, duration=0.2):
413 with self._count_lock:
414 self.close_count = 0
415 self.close_success_count = 0
416 self.do_continue = True
417 threads = []
418 try:
419 for i in range(nb_workers):
420 t = threading.Thread(target=func)
421 t.start()
422 threads.append(t)
423 for _ in xrange(100):
424 time.sleep(duration/100)
425 with self._count_lock:
426 if self.close_count-self.close_success_count > nb_workers+1:
427 if test_support.verbose:
428 print 'Q',
429 break
430 time.sleep(duration)
431 finally:
432 self.do_continue = False
433 for t in threads:
434 t.join()
435
436 def _test_close_open_io(self, io_func, nb_workers=5):
437 def worker():
438 self._create_file()
439 funcs = itertools.cycle((
440 lambda: io_func(),
441 lambda: self._close_and_reopen_file(),
442 ))
443 for f in funcs:
444 if not self.do_continue:
445 break
446 try:
447 f()
448 except (IOError, ValueError):
449 pass
450 self._run_workers(worker, nb_workers)
451 if test_support.verbose:
452 # Useful verbose statistics when tuning this test to take
453 # less time to run but still ensuring that its still useful.
454 #
455 # the percent of close calls that raised an error
456 percent = 100. - 100.*self.close_success_count/self.close_count
457 print self.close_count, ('%.4f ' % percent),
458
459 def test_close_open(self):
460 def io_func():
461 pass
462 self._test_close_open_io(io_func)
463
464 def test_close_open_flush(self):
465 def io_func():
466 self.f.flush()
467 self._test_close_open_io(io_func)
468
469 def test_close_open_iter(self):
470 def io_func():
471 list(iter(self.f))
472 self._test_close_open_io(io_func)
473
474 def test_close_open_isatty(self):
475 def io_func():
476 self.f.isatty()
477 self._test_close_open_io(io_func)
478
479 def test_close_open_print(self):
480 def io_func():
481 print >> self.f, ''
482 self._test_close_open_io(io_func)
483
484 def test_close_open_read(self):
485 def io_func():
486 self.f.read(0)
487 self._test_close_open_io(io_func)
488
489 def test_close_open_readinto(self):
490 def io_func():
491 a = array('c', 'xxxxx')
492 self.f.readinto(a)
493 self._test_close_open_io(io_func)
494
495 def test_close_open_readline(self):
496 def io_func():
497 self.f.readline()
498 self._test_close_open_io(io_func)
499
500 def test_close_open_readlines(self):
501 def io_func():
502 self.f.readlines()
503 self._test_close_open_io(io_func)
504
505 def test_close_open_seek(self):
506 def io_func():
507 self.f.seek(0, 0)
508 self._test_close_open_io(io_func)
509
510 def test_close_open_tell(self):
511 def io_func():
512 self.f.tell()
513 self._test_close_open_io(io_func)
514
515 def test_close_open_truncate(self):
516 def io_func():
517 self.f.truncate()
518 self._test_close_open_io(io_func)
519
520 def test_close_open_write(self):
521 def io_func():
522 self.f.write('')
523 self._test_close_open_io(io_func)
524
525 def test_close_open_writelines(self):
526 def io_func():
527 self.f.writelines('')
528 self._test_close_open_io(io_func)
529
530
531class StdoutTests(unittest.TestCase):
532
533 def test_move_stdout_on_write(self):
534 # Issue 3242: sys.stdout can be replaced (and freed) during a
535 # print statement; prevent a segfault in this case
536 save_stdout = sys.stdout
537
538 class File:
539 def write(self, data):
540 if '\n' in data:
541 sys.stdout = save_stdout
542
543 try:
544 sys.stdout = File()
545 print "some text"
546 finally:
547 sys.stdout = save_stdout
548
549 def test_del_stdout_before_print(self):
550 # Issue 4597: 'print' with no argument wasn't reporting when
551 # sys.stdout was deleted.
552 save_stdout = sys.stdout
553 del sys.stdout
554 try:
555 print
556 except RuntimeError as e:
557 self.assertEquals(str(e), "lost sys.stdout")
558 else:
559 self.fail("Expected RuntimeError")
560 finally:
561 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000562
563
564def test_main():
565 # Historically, these tests have been sloppy about removing TESTFN.
566 # So get rid of it no matter what.
567 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000568 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
569 FileThreadingTests, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000570 finally:
571 if os.path.exists(TESTFN):
572 os.unlink(TESTFN)
573
574if __name__ == '__main__':
575 test_main()