blob: b93bdbd9d0a5bd28c4b8e7ad64e945890d840c29 [file] [log] [blame]
Martin v. Löwisf90ae202002-06-11 06:22:31 +00001import sys
Fred Drake2ec80fa2000-10-23 16:59:35 +00002import os
Georg Brandl442b49e2006-06-08 14:50:53 +00003import unittest
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +00004import itertools
5import time
6import threading
Neal Norwitz62f5a9d2002-04-01 00:09:00 +00007from array import array
Raymond Hettingercb87bc82004-05-31 00:35:52 +00008from weakref import proxy
Fred Drake2ec80fa2000-10-23 16:59:35 +00009
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +000010from test import test_support
Georg Brandl442b49e2006-06-08 14:50:53 +000011from test.test_support import TESTFN, findfile, run_unittest
Marc-André Lemburgfa44d792000-08-25 22:37:31 +000012from UserList import UserList
13
Georg Brandl442b49e2006-06-08 14:50:53 +000014class AutoFileTests(unittest.TestCase):
15 # file tests for which a test file is automatically set up
Raymond Hettingercb87bc82004-05-31 00:35:52 +000016
Georg Brandl442b49e2006-06-08 14:50:53 +000017 def setUp(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000018 self.f = open(TESTFN, 'wb')
Tim Peters015dd822003-05-04 04:16:52 +000019
Georg Brandl442b49e2006-06-08 14:50:53 +000020 def tearDown(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000021 if self.f:
22 self.f.close()
23 os.remove(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +000024
25 def testWeakRefs(self):
26 # verify weak references
27 p = proxy(self.f)
28 p.write('teststring')
29 self.assertEquals(self.f.tell(), p.tell())
30 self.f.close()
31 self.f = None
32 self.assertRaises(ReferenceError, getattr, p, 'tell')
33
34 def testAttributes(self):
35 # verify expected attributes exist
36 f = self.f
37 softspace = f.softspace
38 f.name # merely shouldn't blow up
39 f.mode # ditto
40 f.closed # ditto
41
42 # verify softspace is writable
43 f.softspace = softspace # merely shouldn't blow up
44
45 # verify the others aren't
46 for attr in 'name', 'mode', 'closed':
47 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
48
49 def testReadinto(self):
50 # verify readinto
51 self.f.write('12')
52 self.f.close()
53 a = array('c', 'x'*10)
54 self.f = open(TESTFN, 'rb')
55 n = self.f.readinto(a)
56 self.assertEquals('12', a.tostring()[:n])
57
58 def testWritelinesUserList(self):
59 # verify writelines with instance sequence
60 l = UserList(['1', '2'])
61 self.f.writelines(l)
62 self.f.close()
63 self.f = open(TESTFN, 'rb')
64 buf = self.f.read()
65 self.assertEquals(buf, '12')
66
67 def testWritelinesIntegers(self):
68 # verify writelines with integers
69 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
70
71 def testWritelinesIntegersUserList(self):
72 # verify writelines with integers in UserList
73 l = UserList([1,2,3])
74 self.assertRaises(TypeError, self.f.writelines, l)
75
76 def testWritelinesNonString(self):
77 # verify writelines with non-string object
Tim Petersdbb82f62006-06-09 03:51:41 +000078 class NonString:
79 pass
Georg Brandl442b49e2006-06-08 14:50:53 +000080
Tim Petersdbb82f62006-06-09 03:51:41 +000081 self.assertRaises(TypeError, self.f.writelines,
82 [NonString(), NonString()])
Georg Brandl442b49e2006-06-08 14:50:53 +000083
84 def testRepr(self):
85 # verify repr works
86 self.assert_(repr(self.f).startswith("<open file '" + TESTFN))
87
88 def testErrors(self):
89 f = self.f
90 self.assertEquals(f.name, TESTFN)
91 self.assert_(not f.isatty())
92 self.assert_(not f.closed)
Tim Peters520d8dd2006-06-09 02:11:02 +000093
Georg Brandl442b49e2006-06-08 14:50:53 +000094 self.assertRaises(TypeError, f.readinto, "")
95 f.close()
96 self.assert_(f.closed)
97
98 def testMethods(self):
99 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
Tim Petersdbb82f62006-06-09 03:51:41 +0000100 'readline', 'readlines', 'seek', 'tell', 'truncate',
101 'write', 'xreadlines', '__iter__']
Georg Brandl442b49e2006-06-08 14:50:53 +0000102 if sys.platform.startswith('atheos'):
103 methods.remove('truncate')
104
Georg Brandle7ec81f2006-06-09 18:29:52 +0000105 # __exit__ should close the file
106 self.f.__exit__(None, None, None)
107 self.assert_(self.f.closed)
Georg Brandl442b49e2006-06-08 14:50:53 +0000108
109 for methodname in methods:
110 method = getattr(self.f, methodname)
111 # should raise on closed file
112 self.assertRaises(ValueError, method)
113 self.assertRaises(ValueError, self.f.writelines, [])
114
Georg Brandle7ec81f2006-06-09 18:29:52 +0000115 # file is closed, __exit__ shouldn't do anything
116 self.assertEquals(self.f.__exit__(None, None, None), None)
117 # it must also return None if an exception was given
118 try:
119 1/0
120 except:
121 self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
122
Georg Brandl442b49e2006-06-08 14:50:53 +0000123
124class OtherFileTests(unittest.TestCase):
125
126 def testModeStrings(self):
127 # check invalid mode strings
128 for mode in ("", "aU", "wU+"):
129 try:
Tim Petersdbb82f62006-06-09 03:51:41 +0000130 f = open(TESTFN, mode)
Georg Brandl442b49e2006-06-08 14:50:53 +0000131 except ValueError:
132 pass
133 else:
134 f.close()
135 self.fail('%r is an invalid file mode' % mode)
136
Amaury Forgeot d'Arc17617a02008-09-25 20:52:56 +0000137 # Some invalid modes fail on Windows, but pass on Unix
138 # Issue3965: avoid a crash on Windows when filename is unicode
139 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
140 try:
141 f = open(name, "rr")
142 except IOError:
143 pass
144 else:
145 f.close()
146
Georg Brandl442b49e2006-06-08 14:50:53 +0000147 def testStdin(self):
148 # This causes the interpreter to exit on OSF1 v5.1.
149 if sys.platform != 'osf1V5':
150 self.assertRaises(IOError, sys.stdin.seek, -1)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000151 else:
Georg Brandl442b49e2006-06-08 14:50:53 +0000152 print >>sys.__stdout__, (
153 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
154 ' Test manually.')
155 self.assertRaises(IOError, sys.stdin.truncate)
156
157 def testUnicodeOpen(self):
158 # verify repr works for unicode too
159 f = open(unicode(TESTFN), "w")
160 self.assert_(repr(f).startswith("<open file u'" + TESTFN))
Thomas Woutersc45251a2006-02-12 11:53:32 +0000161 f.close()
Tim Peters0556e9b2006-06-09 04:02:06 +0000162 os.unlink(TESTFN)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000163
Georg Brandl442b49e2006-06-08 14:50:53 +0000164 def testBadModeArgument(self):
165 # verify that we get a sensible error message for bad mode argument
166 bad_mode = "qwerty"
Tim Peterscffcfed2006-02-14 17:41:18 +0000167 try:
Georg Brandl442b49e2006-06-08 14:50:53 +0000168 f = open(TESTFN, bad_mode)
169 except ValueError, msg:
170 if msg[0] != 0:
171 s = str(msg)
172 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:
173 self.fail("bad error message for invalid mode: %s" % s)
174 # if msg[0] == 0, we're probably on Windows where there may be
175 # no obvious way to discover why open() failed.
176 else:
177 f.close()
178 self.fail("no error for invalid mode: %s" % bad_mode)
179
180 def testSetBufferSize(self):
181 # make sure that explicitly setting the buffer size doesn't cause
182 # misbehaviour especially with repeated close() calls
183 for s in (-1, 0, 1, 512):
184 try:
185 f = open(TESTFN, 'w', s)
186 f.write(str(s))
187 f.close()
188 f.close()
189 f = open(TESTFN, 'r', s)
190 d = int(f.read())
191 f.close()
192 f.close()
193 except IOError, msg:
194 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
195 self.assertEquals(d, s)
196
197 def testTruncateOnWindows(self):
198 os.unlink(TESTFN)
199
200 def bug801631():
201 # SF bug <http://www.python.org/sf/801631>
202 # "file.truncate fault on windows"
Tim Petersdbb82f62006-06-09 03:51:41 +0000203 f = open(TESTFN, 'wb')
Georg Brandl442b49e2006-06-08 14:50:53 +0000204 f.write('12345678901') # 11 bytes
205 f.close()
206
Tim Petersdbb82f62006-06-09 03:51:41 +0000207 f = open(TESTFN,'rb+')
Georg Brandl442b49e2006-06-08 14:50:53 +0000208 data = f.read(5)
209 if data != '12345':
210 self.fail("Read on file opened for update failed %r" % data)
211 if f.tell() != 5:
212 self.fail("File pos after read wrong %d" % f.tell())
213
214 f.truncate()
215 if f.tell() != 5:
216 self.fail("File pos after ftruncate wrong %d" % f.tell())
217
218 f.close()
219 size = os.path.getsize(TESTFN)
220 if size != 5:
221 self.fail("File size after ftruncate wrong %d" % size)
222
223 try:
224 bug801631()
225 finally:
226 os.unlink(TESTFN)
227
228 def testIteration(self):
Tim Petersdbb82f62006-06-09 03:51:41 +0000229 # Test the complex interaction when mixing file-iteration and the
230 # various read* methods. Ostensibly, the mixture could just be tested
231 # to work when it should work according to the Python language,
232 # instead of fail when it should fail according to the current CPython
233 # implementation. People don't always program Python the way they
234 # should, though, and the implemenation might change in subtle ways,
235 # so we explicitly test for errors, too; the test will just have to
236 # be updated when the implementation changes.
Georg Brandl442b49e2006-06-08 14:50:53 +0000237 dataoffset = 16384
238 filler = "ham\n"
239 assert not dataoffset % len(filler), \
240 "dataoffset must be multiple of len(filler)"
241 nchunks = dataoffset // len(filler)
242 testlines = [
243 "spam, spam and eggs\n",
244 "eggs, spam, ham and spam\n",
245 "saussages, spam, spam and eggs\n",
246 "spam, ham, spam and eggs\n",
247 "spam, spam, spam, spam, spam, ham, spam\n",
248 "wonderful spaaaaaam.\n"
249 ]
250 methods = [("readline", ()), ("read", ()), ("readlines", ()),
251 ("readinto", (array("c", " "*100),))]
252
253 try:
254 # Prepare the testfile
255 bag = open(TESTFN, "w")
256 bag.write(filler * nchunks)
257 bag.writelines(testlines)
258 bag.close()
259 # Test for appropriate errors mixing read* and iteration
260 for methodname, args in methods:
261 f = open(TESTFN)
262 if f.next() != filler:
263 self.fail, "Broken testfile"
264 meth = getattr(f, methodname)
265 try:
266 meth(*args)
267 except ValueError:
268 pass
269 else:
270 self.fail("%s%r after next() didn't raise ValueError" %
271 (methodname, args))
272 f.close()
273
Tim Petersdbb82f62006-06-09 03:51:41 +0000274 # Test to see if harmless (by accident) mixing of read* and
275 # iteration still works. This depends on the size of the internal
276 # iteration buffer (currently 8192,) but we can test it in a
277 # flexible manner. Each line in the bag o' ham is 4 bytes
278 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
279 # exactly on the buffer boundary for any power-of-2 buffersize
280 # between 4 and 16384 (inclusive).
Georg Brandl442b49e2006-06-08 14:50:53 +0000281 f = open(TESTFN)
282 for i in range(nchunks):
283 f.next()
284 testline = testlines.pop(0)
285 try:
286 line = f.readline()
287 except ValueError:
288 self.fail("readline() after next() with supposedly empty "
289 "iteration-buffer failed anyway")
290 if line != testline:
291 self.fail("readline() after next() with empty buffer "
292 "failed. Got %r, expected %r" % (line, testline))
293 testline = testlines.pop(0)
294 buf = array("c", "\x00" * len(testline))
295 try:
296 f.readinto(buf)
297 except ValueError:
298 self.fail("readinto() after next() with supposedly empty "
299 "iteration-buffer failed anyway")
300 line = buf.tostring()
301 if line != testline:
302 self.fail("readinto() after next() with empty buffer "
303 "failed. Got %r, expected %r" % (line, testline))
304
305 testline = testlines.pop(0)
306 try:
307 line = f.read(len(testline))
308 except ValueError:
309 self.fail("read() after next() with supposedly empty "
310 "iteration-buffer failed anyway")
311 if line != testline:
312 self.fail("read() after next() with empty buffer "
313 "failed. Got %r, expected %r" % (line, testline))
314 try:
315 lines = f.readlines()
316 except ValueError:
317 self.fail("readlines() after next() with supposedly empty "
318 "iteration-buffer failed anyway")
319 if lines != testlines:
320 self.fail("readlines() after next() with empty buffer "
321 "failed. Got %r, expected %r" % (line, testline))
322 # Reading after iteration hit EOF shouldn't hurt either
323 f = open(TESTFN)
324 try:
325 for line in f:
326 pass
327 try:
328 f.readline()
329 f.readinto(buf)
330 f.read()
331 f.readlines()
332 except ValueError:
333 self.fail("read* failed after next() consumed file")
334 finally:
335 f.close()
336 finally:
337 os.unlink(TESTFN)
338
Georg Brandlad61bc82008-02-23 15:11:18 +0000339class FileSubclassTests(unittest.TestCase):
340
341 def testExit(self):
342 # test that exiting with context calls subclass' close
343 class C(file):
344 def __init__(self, *args):
345 self.subclass_closed = False
346 file.__init__(self, *args)
347 def close(self):
348 self.subclass_closed = True
349 file.close(self)
350
351 with C(TESTFN, 'w') as f:
352 pass
353 self.failUnless(f.subclass_closed)
354
Georg Brandl442b49e2006-06-08 14:50:53 +0000355
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000356class FileThreadingTests(unittest.TestCase):
357 # These tests check the ability to call various methods of file objects
358 # (including close()) concurrently without crashing the Python interpreter.
359 # See #815646, #595601
360
361 def setUp(self):
362 self.f = None
363 self.filename = TESTFN
364 with open(self.filename, "w") as f:
365 f.write("\n".join("0123456789"))
366 self._count_lock = threading.Lock()
367 self.close_count = 0
368 self.close_success_count = 0
369
370 def tearDown(self):
371 if self.f:
372 try:
373 self.f.close()
374 except (EnvironmentError, ValueError):
375 pass
376 try:
377 os.remove(self.filename)
378 except EnvironmentError:
379 pass
380
381 def _create_file(self):
382 self.f = open(self.filename, "w+")
383
384 def _close_file(self):
385 with self._count_lock:
386 self.close_count += 1
387 self.f.close()
388 with self._count_lock:
389 self.close_success_count += 1
390
391 def _close_and_reopen_file(self):
392 self._close_file()
393 # if close raises an exception thats fine, self.f remains valid so
394 # we don't need to reopen.
395 self._create_file()
396
397 def _run_workers(self, func, nb_workers, duration=0.2):
398 with self._count_lock:
399 self.close_count = 0
400 self.close_success_count = 0
401 self.do_continue = True
402 threads = []
403 try:
404 for i in range(nb_workers):
405 t = threading.Thread(target=func)
406 t.start()
407 threads.append(t)
408 for _ in xrange(100):
409 time.sleep(duration/100)
410 with self._count_lock:
411 if self.close_count-self.close_success_count > nb_workers+1:
412 if test_support.verbose:
413 print 'Q',
414 break
415 time.sleep(duration)
416 finally:
417 self.do_continue = False
418 for t in threads:
419 t.join()
420
421 def _test_close_open_io(self, io_func, nb_workers=5):
422 def worker():
423 self._create_file()
424 funcs = itertools.cycle((
425 lambda: io_func(),
426 lambda: self._close_and_reopen_file(),
427 ))
428 for f in funcs:
429 if not self.do_continue:
430 break
431 try:
432 f()
433 except (IOError, ValueError):
434 pass
435 self._run_workers(worker, nb_workers)
436 if test_support.verbose:
437 # Useful verbose statistics when tuning this test to take
438 # less time to run but still ensuring that its still useful.
439 #
440 # the percent of close calls that raised an error
441 percent = 100. - 100.*self.close_success_count/self.close_count
442 print self.close_count, ('%.4f ' % percent),
443
444 def test_close_open(self):
445 def io_func():
446 pass
447 self._test_close_open_io(io_func)
448
449 def test_close_open_flush(self):
450 def io_func():
451 self.f.flush()
452 self._test_close_open_io(io_func)
453
454 def test_close_open_iter(self):
455 def io_func():
456 list(iter(self.f))
457 self._test_close_open_io(io_func)
458
459 def test_close_open_isatty(self):
460 def io_func():
461 self.f.isatty()
462 self._test_close_open_io(io_func)
463
464 def test_close_open_print(self):
465 def io_func():
466 print >> self.f, ''
467 self._test_close_open_io(io_func)
468
469 def test_close_open_read(self):
470 def io_func():
471 self.f.read(0)
472 self._test_close_open_io(io_func)
473
474 def test_close_open_readinto(self):
475 def io_func():
476 a = array('c', 'xxxxx')
477 self.f.readinto(a)
478 self._test_close_open_io(io_func)
479
480 def test_close_open_readline(self):
481 def io_func():
482 self.f.readline()
483 self._test_close_open_io(io_func)
484
485 def test_close_open_readlines(self):
486 def io_func():
487 self.f.readlines()
488 self._test_close_open_io(io_func)
489
490 def test_close_open_seek(self):
491 def io_func():
492 self.f.seek(0, 0)
493 self._test_close_open_io(io_func)
494
495 def test_close_open_tell(self):
496 def io_func():
497 self.f.tell()
498 self._test_close_open_io(io_func)
499
500 def test_close_open_truncate(self):
501 def io_func():
502 self.f.truncate()
503 self._test_close_open_io(io_func)
504
505 def test_close_open_write(self):
506 def io_func():
507 self.f.write('')
508 self._test_close_open_io(io_func)
509
510 def test_close_open_writelines(self):
511 def io_func():
512 self.f.writelines('')
513 self._test_close_open_io(io_func)
514
515
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000516class StdoutTests(unittest.TestCase):
517
518 def test_move_stdout_on_write(self):
519 # Issue 3242: sys.stdout can be replaced (and freed) during a
520 # print statement; prevent a segfault in this case
521 save_stdout = sys.stdout
522
523 class File:
524 def write(self, data):
525 if '\n' in data:
526 sys.stdout = save_stdout
527
528 try:
529 sys.stdout = File()
530 print "some text"
531 finally:
532 sys.stdout = save_stdout
533
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000534
Georg Brandl442b49e2006-06-08 14:50:53 +0000535def test_main():
Neal Norwitzc9778a82006-06-09 05:54:18 +0000536 # Historically, these tests have been sloppy about removing TESTFN.
537 # So get rid of it no matter what.
Tim Peters0556e9b2006-06-09 04:02:06 +0000538 try:
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000539 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000540 FileThreadingTests, StdoutTests)
Tim Peters0556e9b2006-06-09 04:02:06 +0000541 finally:
542 if os.path.exists(TESTFN):
543 os.unlink(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +0000544
545if __name__ == '__main__':
546 test_main()