blob: 7cfaef76fedfaca3e231ea44e493b6814d220676 [file] [log] [blame]
Martin v. Löwisf90ae202002-06-11 06:22:31 +00001import sys
Fred Drake2ec80fa2000-10-23 16:59:35 +00002import os
Georg Brandl442b49e2006-06-08 14:50:53 +00003import unittest
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +00004import itertools
5import time
6import threading
Neal Norwitz62f5a9d2002-04-01 00:09:00 +00007from array import array
Raymond Hettingercb87bc82004-05-31 00:35:52 +00008from weakref import proxy
Fred Drake2ec80fa2000-10-23 16:59:35 +00009
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +000010from test import test_support
Georg Brandl442b49e2006-06-08 14:50:53 +000011from test.test_support import TESTFN, findfile, run_unittest
Marc-André Lemburgfa44d792000-08-25 22:37:31 +000012from UserList import UserList
13
Georg Brandl442b49e2006-06-08 14:50:53 +000014class AutoFileTests(unittest.TestCase):
15 # file tests for which a test file is automatically set up
Raymond Hettingercb87bc82004-05-31 00:35:52 +000016
Georg Brandl442b49e2006-06-08 14:50:53 +000017 def setUp(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000018 self.f = open(TESTFN, 'wb')
Tim Peters015dd822003-05-04 04:16:52 +000019
Georg Brandl442b49e2006-06-08 14:50:53 +000020 def tearDown(self):
Tim Petersdbb82f62006-06-09 03:51:41 +000021 if self.f:
22 self.f.close()
23 os.remove(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +000024
25 def testWeakRefs(self):
26 # verify weak references
27 p = proxy(self.f)
28 p.write('teststring')
29 self.assertEquals(self.f.tell(), p.tell())
30 self.f.close()
31 self.f = None
32 self.assertRaises(ReferenceError, getattr, p, 'tell')
33
34 def testAttributes(self):
35 # verify expected attributes exist
36 f = self.f
37 softspace = f.softspace
38 f.name # merely shouldn't blow up
39 f.mode # ditto
40 f.closed # ditto
41
42 # verify softspace is writable
43 f.softspace = softspace # merely shouldn't blow up
44
45 # verify the others aren't
46 for attr in 'name', 'mode', 'closed':
47 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
48
49 def testReadinto(self):
50 # verify readinto
51 self.f.write('12')
52 self.f.close()
53 a = array('c', 'x'*10)
54 self.f = open(TESTFN, 'rb')
55 n = self.f.readinto(a)
56 self.assertEquals('12', a.tostring()[:n])
57
58 def testWritelinesUserList(self):
59 # verify writelines with instance sequence
60 l = UserList(['1', '2'])
61 self.f.writelines(l)
62 self.f.close()
63 self.f = open(TESTFN, 'rb')
64 buf = self.f.read()
65 self.assertEquals(buf, '12')
66
67 def testWritelinesIntegers(self):
68 # verify writelines with integers
69 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
70
71 def testWritelinesIntegersUserList(self):
72 # verify writelines with integers in UserList
73 l = UserList([1,2,3])
74 self.assertRaises(TypeError, self.f.writelines, l)
75
76 def testWritelinesNonString(self):
77 # verify writelines with non-string object
Tim Petersdbb82f62006-06-09 03:51:41 +000078 class NonString:
79 pass
Georg Brandl442b49e2006-06-08 14:50:53 +000080
Tim Petersdbb82f62006-06-09 03:51:41 +000081 self.assertRaises(TypeError, self.f.writelines,
82 [NonString(), NonString()])
Georg Brandl442b49e2006-06-08 14:50:53 +000083
84 def testRepr(self):
85 # verify repr works
86 self.assert_(repr(self.f).startswith("<open file '" + TESTFN))
87
88 def testErrors(self):
89 f = self.f
90 self.assertEquals(f.name, TESTFN)
91 self.assert_(not f.isatty())
92 self.assert_(not f.closed)
Tim Peters520d8dd2006-06-09 02:11:02 +000093
Georg Brandl442b49e2006-06-08 14:50:53 +000094 self.assertRaises(TypeError, f.readinto, "")
95 f.close()
96 self.assert_(f.closed)
97
98 def testMethods(self):
99 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
Tim Petersdbb82f62006-06-09 03:51:41 +0000100 'readline', 'readlines', 'seek', 'tell', 'truncate',
101 'write', 'xreadlines', '__iter__']
Georg Brandl442b49e2006-06-08 14:50:53 +0000102 if sys.platform.startswith('atheos'):
103 methods.remove('truncate')
104
Georg Brandle7ec81f2006-06-09 18:29:52 +0000105 # __exit__ should close the file
106 self.f.__exit__(None, None, None)
107 self.assert_(self.f.closed)
Georg Brandl442b49e2006-06-08 14:50:53 +0000108
109 for methodname in methods:
110 method = getattr(self.f, methodname)
111 # should raise on closed file
112 self.assertRaises(ValueError, method)
113 self.assertRaises(ValueError, self.f.writelines, [])
114
Georg Brandle7ec81f2006-06-09 18:29:52 +0000115 # file is closed, __exit__ shouldn't do anything
116 self.assertEquals(self.f.__exit__(None, None, None), None)
117 # it must also return None if an exception was given
118 try:
119 1/0
120 except:
121 self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
122
Georg Brandl442b49e2006-06-08 14:50:53 +0000123
124class OtherFileTests(unittest.TestCase):
125
126 def testModeStrings(self):
127 # check invalid mode strings
128 for mode in ("", "aU", "wU+"):
129 try:
Tim Petersdbb82f62006-06-09 03:51:41 +0000130 f = open(TESTFN, mode)
Georg Brandl442b49e2006-06-08 14:50:53 +0000131 except ValueError:
132 pass
133 else:
134 f.close()
135 self.fail('%r is an invalid file mode' % mode)
136
137 def testStdin(self):
138 # This causes the interpreter to exit on OSF1 v5.1.
139 if sys.platform != 'osf1V5':
140 self.assertRaises(IOError, sys.stdin.seek, -1)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000141 else:
Georg Brandl442b49e2006-06-08 14:50:53 +0000142 print >>sys.__stdout__, (
143 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
144 ' Test manually.')
145 self.assertRaises(IOError, sys.stdin.truncate)
146
147 def testUnicodeOpen(self):
148 # verify repr works for unicode too
149 f = open(unicode(TESTFN), "w")
150 self.assert_(repr(f).startswith("<open file u'" + TESTFN))
Thomas Woutersc45251a2006-02-12 11:53:32 +0000151 f.close()
Tim Peters0556e9b2006-06-09 04:02:06 +0000152 os.unlink(TESTFN)
Thomas Woutersc45251a2006-02-12 11:53:32 +0000153
Georg Brandl442b49e2006-06-08 14:50:53 +0000154 def testBadModeArgument(self):
155 # verify that we get a sensible error message for bad mode argument
156 bad_mode = "qwerty"
Tim Peterscffcfed2006-02-14 17:41:18 +0000157 try:
Georg Brandl442b49e2006-06-08 14:50:53 +0000158 f = open(TESTFN, bad_mode)
159 except ValueError, msg:
160 if msg[0] != 0:
161 s = str(msg)
162 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:
163 self.fail("bad error message for invalid mode: %s" % s)
164 # if msg[0] == 0, we're probably on Windows where there may be
165 # no obvious way to discover why open() failed.
166 else:
167 f.close()
168 self.fail("no error for invalid mode: %s" % bad_mode)
169
170 def testSetBufferSize(self):
171 # make sure that explicitly setting the buffer size doesn't cause
172 # misbehaviour especially with repeated close() calls
173 for s in (-1, 0, 1, 512):
174 try:
175 f = open(TESTFN, 'w', s)
176 f.write(str(s))
177 f.close()
178 f.close()
179 f = open(TESTFN, 'r', s)
180 d = int(f.read())
181 f.close()
182 f.close()
183 except IOError, msg:
184 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
185 self.assertEquals(d, s)
186
187 def testTruncateOnWindows(self):
188 os.unlink(TESTFN)
189
190 def bug801631():
191 # SF bug <http://www.python.org/sf/801631>
192 # "file.truncate fault on windows"
Tim Petersdbb82f62006-06-09 03:51:41 +0000193 f = open(TESTFN, 'wb')
Georg Brandl442b49e2006-06-08 14:50:53 +0000194 f.write('12345678901') # 11 bytes
195 f.close()
196
Tim Petersdbb82f62006-06-09 03:51:41 +0000197 f = open(TESTFN,'rb+')
Georg Brandl442b49e2006-06-08 14:50:53 +0000198 data = f.read(5)
199 if data != '12345':
200 self.fail("Read on file opened for update failed %r" % data)
201 if f.tell() != 5:
202 self.fail("File pos after read wrong %d" % f.tell())
203
204 f.truncate()
205 if f.tell() != 5:
206 self.fail("File pos after ftruncate wrong %d" % f.tell())
207
208 f.close()
209 size = os.path.getsize(TESTFN)
210 if size != 5:
211 self.fail("File size after ftruncate wrong %d" % size)
212
213 try:
214 bug801631()
215 finally:
216 os.unlink(TESTFN)
217
218 def testIteration(self):
Tim Petersdbb82f62006-06-09 03:51:41 +0000219 # Test the complex interaction when mixing file-iteration and the
220 # various read* methods. Ostensibly, the mixture could just be tested
221 # to work when it should work according to the Python language,
222 # instead of fail when it should fail according to the current CPython
223 # implementation. People don't always program Python the way they
224 # should, though, and the implemenation might change in subtle ways,
225 # so we explicitly test for errors, too; the test will just have to
226 # be updated when the implementation changes.
Georg Brandl442b49e2006-06-08 14:50:53 +0000227 dataoffset = 16384
228 filler = "ham\n"
229 assert not dataoffset % len(filler), \
230 "dataoffset must be multiple of len(filler)"
231 nchunks = dataoffset // len(filler)
232 testlines = [
233 "spam, spam and eggs\n",
234 "eggs, spam, ham and spam\n",
235 "saussages, spam, spam and eggs\n",
236 "spam, ham, spam and eggs\n",
237 "spam, spam, spam, spam, spam, ham, spam\n",
238 "wonderful spaaaaaam.\n"
239 ]
240 methods = [("readline", ()), ("read", ()), ("readlines", ()),
241 ("readinto", (array("c", " "*100),))]
242
243 try:
244 # Prepare the testfile
245 bag = open(TESTFN, "w")
246 bag.write(filler * nchunks)
247 bag.writelines(testlines)
248 bag.close()
249 # Test for appropriate errors mixing read* and iteration
250 for methodname, args in methods:
251 f = open(TESTFN)
252 if f.next() != filler:
253 self.fail, "Broken testfile"
254 meth = getattr(f, methodname)
255 try:
256 meth(*args)
257 except ValueError:
258 pass
259 else:
260 self.fail("%s%r after next() didn't raise ValueError" %
261 (methodname, args))
262 f.close()
263
Tim Petersdbb82f62006-06-09 03:51:41 +0000264 # Test to see if harmless (by accident) mixing of read* and
265 # iteration still works. This depends on the size of the internal
266 # iteration buffer (currently 8192,) but we can test it in a
267 # flexible manner. Each line in the bag o' ham is 4 bytes
268 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
269 # exactly on the buffer boundary for any power-of-2 buffersize
270 # between 4 and 16384 (inclusive).
Georg Brandl442b49e2006-06-08 14:50:53 +0000271 f = open(TESTFN)
272 for i in range(nchunks):
273 f.next()
274 testline = testlines.pop(0)
275 try:
276 line = f.readline()
277 except ValueError:
278 self.fail("readline() after next() with supposedly empty "
279 "iteration-buffer failed anyway")
280 if line != testline:
281 self.fail("readline() after next() with empty buffer "
282 "failed. Got %r, expected %r" % (line, testline))
283 testline = testlines.pop(0)
284 buf = array("c", "\x00" * len(testline))
285 try:
286 f.readinto(buf)
287 except ValueError:
288 self.fail("readinto() after next() with supposedly empty "
289 "iteration-buffer failed anyway")
290 line = buf.tostring()
291 if line != testline:
292 self.fail("readinto() after next() with empty buffer "
293 "failed. Got %r, expected %r" % (line, testline))
294
295 testline = testlines.pop(0)
296 try:
297 line = f.read(len(testline))
298 except ValueError:
299 self.fail("read() after next() with supposedly empty "
300 "iteration-buffer failed anyway")
301 if line != testline:
302 self.fail("read() after next() with empty buffer "
303 "failed. Got %r, expected %r" % (line, testline))
304 try:
305 lines = f.readlines()
306 except ValueError:
307 self.fail("readlines() after next() with supposedly empty "
308 "iteration-buffer failed anyway")
309 if lines != testlines:
310 self.fail("readlines() after next() with empty buffer "
311 "failed. Got %r, expected %r" % (line, testline))
312 # Reading after iteration hit EOF shouldn't hurt either
313 f = open(TESTFN)
314 try:
315 for line in f:
316 pass
317 try:
318 f.readline()
319 f.readinto(buf)
320 f.read()
321 f.readlines()
322 except ValueError:
323 self.fail("read* failed after next() consumed file")
324 finally:
325 f.close()
326 finally:
327 os.unlink(TESTFN)
328
Georg Brandlad61bc82008-02-23 15:11:18 +0000329class FileSubclassTests(unittest.TestCase):
330
331 def testExit(self):
332 # test that exiting with context calls subclass' close
333 class C(file):
334 def __init__(self, *args):
335 self.subclass_closed = False
336 file.__init__(self, *args)
337 def close(self):
338 self.subclass_closed = True
339 file.close(self)
340
341 with C(TESTFN, 'w') as f:
342 pass
343 self.failUnless(f.subclass_closed)
344
Georg Brandl442b49e2006-06-08 14:50:53 +0000345
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000346class FileThreadingTests(unittest.TestCase):
347 # These tests check the ability to call various methods of file objects
348 # (including close()) concurrently without crashing the Python interpreter.
349 # See #815646, #595601
350
351 def setUp(self):
352 self.f = None
353 self.filename = TESTFN
354 with open(self.filename, "w") as f:
355 f.write("\n".join("0123456789"))
356 self._count_lock = threading.Lock()
357 self.close_count = 0
358 self.close_success_count = 0
359
360 def tearDown(self):
361 if self.f:
362 try:
363 self.f.close()
364 except (EnvironmentError, ValueError):
365 pass
366 try:
367 os.remove(self.filename)
368 except EnvironmentError:
369 pass
370
371 def _create_file(self):
372 self.f = open(self.filename, "w+")
373
374 def _close_file(self):
375 with self._count_lock:
376 self.close_count += 1
377 self.f.close()
378 with self._count_lock:
379 self.close_success_count += 1
380
381 def _close_and_reopen_file(self):
382 self._close_file()
383 # if close raises an exception thats fine, self.f remains valid so
384 # we don't need to reopen.
385 self._create_file()
386
387 def _run_workers(self, func, nb_workers, duration=0.2):
388 with self._count_lock:
389 self.close_count = 0
390 self.close_success_count = 0
391 self.do_continue = True
392 threads = []
393 try:
394 for i in range(nb_workers):
395 t = threading.Thread(target=func)
396 t.start()
397 threads.append(t)
398 for _ in xrange(100):
399 time.sleep(duration/100)
400 with self._count_lock:
401 if self.close_count-self.close_success_count > nb_workers+1:
402 if test_support.verbose:
403 print 'Q',
404 break
405 time.sleep(duration)
406 finally:
407 self.do_continue = False
408 for t in threads:
409 t.join()
410
411 def _test_close_open_io(self, io_func, nb_workers=5):
412 def worker():
413 self._create_file()
414 funcs = itertools.cycle((
415 lambda: io_func(),
416 lambda: self._close_and_reopen_file(),
417 ))
418 for f in funcs:
419 if not self.do_continue:
420 break
421 try:
422 f()
423 except (IOError, ValueError):
424 pass
425 self._run_workers(worker, nb_workers)
426 if test_support.verbose:
427 # Useful verbose statistics when tuning this test to take
428 # less time to run but still ensuring that its still useful.
429 #
430 # the percent of close calls that raised an error
431 percent = 100. - 100.*self.close_success_count/self.close_count
432 print self.close_count, ('%.4f ' % percent),
433
434 def test_close_open(self):
435 def io_func():
436 pass
437 self._test_close_open_io(io_func)
438
439 def test_close_open_flush(self):
440 def io_func():
441 self.f.flush()
442 self._test_close_open_io(io_func)
443
444 def test_close_open_iter(self):
445 def io_func():
446 list(iter(self.f))
447 self._test_close_open_io(io_func)
448
449 def test_close_open_isatty(self):
450 def io_func():
451 self.f.isatty()
452 self._test_close_open_io(io_func)
453
454 def test_close_open_print(self):
455 def io_func():
456 print >> self.f, ''
457 self._test_close_open_io(io_func)
458
459 def test_close_open_read(self):
460 def io_func():
461 self.f.read(0)
462 self._test_close_open_io(io_func)
463
464 def test_close_open_readinto(self):
465 def io_func():
466 a = array('c', 'xxxxx')
467 self.f.readinto(a)
468 self._test_close_open_io(io_func)
469
470 def test_close_open_readline(self):
471 def io_func():
472 self.f.readline()
473 self._test_close_open_io(io_func)
474
475 def test_close_open_readlines(self):
476 def io_func():
477 self.f.readlines()
478 self._test_close_open_io(io_func)
479
480 def test_close_open_seek(self):
481 def io_func():
482 self.f.seek(0, 0)
483 self._test_close_open_io(io_func)
484
485 def test_close_open_tell(self):
486 def io_func():
487 self.f.tell()
488 self._test_close_open_io(io_func)
489
490 def test_close_open_truncate(self):
491 def io_func():
492 self.f.truncate()
493 self._test_close_open_io(io_func)
494
495 def test_close_open_write(self):
496 def io_func():
497 self.f.write('')
498 self._test_close_open_io(io_func)
499
500 def test_close_open_writelines(self):
501 def io_func():
502 self.f.writelines('')
503 self._test_close_open_io(io_func)
504
505
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000506class StdoutTests(unittest.TestCase):
507
508 def test_move_stdout_on_write(self):
509 # Issue 3242: sys.stdout can be replaced (and freed) during a
510 # print statement; prevent a segfault in this case
511 save_stdout = sys.stdout
512
513 class File:
514 def write(self, data):
515 if '\n' in data:
516 sys.stdout = save_stdout
517
518 try:
519 sys.stdout = File()
520 print "some text"
521 finally:
522 sys.stdout = save_stdout
523
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000524
Georg Brandl442b49e2006-06-08 14:50:53 +0000525def test_main():
Neal Norwitzc9778a82006-06-09 05:54:18 +0000526 # Historically, these tests have been sloppy about removing TESTFN.
527 # So get rid of it no matter what.
Tim Peters0556e9b2006-06-09 04:02:06 +0000528 try:
Gregory P. Smithaa63d0d2008-04-06 23:11:17 +0000529 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
Amaury Forgeot d'Arcbdd941f2008-07-01 20:38:04 +0000530 FileThreadingTests, StdoutTests)
Tim Peters0556e9b2006-06-09 04:02:06 +0000531 finally:
532 if os.path.exists(TESTFN):
533 os.unlink(TESTFN)
Georg Brandl442b49e2006-06-08 14:50:53 +0000534
535if __name__ == '__main__':
536 test_main()