blob: 399f119b81b870438f8856eb458ab051359d2e48 [file] [log] [blame]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00001import sys
2import os
3import unittest
Antoine Pitrou47a5f482009-06-12 20:41:52 +00004import itertools
5import time
Antoine Pitrouc5d2b412009-06-12 20:36:25 +00006from array import array
7from weakref import proxy
Victor Stinner6a102812010-04-27 23:55:59 +00008try:
9 import threading
10except ImportError:
11 threading = None
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000012
Antoine Pitrou47a5f482009-06-12 20:41:52 +000013from test import test_support
Georg Brandla4f46e12010-02-07 17:03:15 +000014from test.test_support import TESTFN, run_unittest
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000015from UserList import UserList
16
17class AutoFileTests(unittest.TestCase):
18 # file tests for which a test file is automatically set up
19
20 def setUp(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +000021 self.f = open(TESTFN, 'wb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000022
23 def tearDown(self):
24 if self.f:
25 self.f.close()
26 os.remove(TESTFN)
27
28 def testWeakRefs(self):
29 # verify weak references
30 p = proxy(self.f)
Antoine Pitrou47a5f482009-06-12 20:41:52 +000031 p.write('teststring')
Ezio Melotti2623a372010-11-21 13:34:58 +000032 self.assertEqual(self.f.tell(), p.tell())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000033 self.f.close()
34 self.f = None
35 self.assertRaises(ReferenceError, getattr, p, 'tell')
36
37 def testAttributes(self):
38 # verify expected attributes exist
39 f = self.f
Ezio Melottid80b4bf2010-03-17 13:52:48 +000040 with test_support.check_py3k_warnings():
41 softspace = f.softspace
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000042 f.name # merely shouldn't blow up
43 f.mode # ditto
44 f.closed # ditto
45
Ezio Melottid80b4bf2010-03-17 13:52:48 +000046 with test_support.check_py3k_warnings():
47 # verify softspace is writable
48 f.softspace = softspace # merely shouldn't blow up
Antoine Pitrou47a5f482009-06-12 20:41:52 +000049
50 # verify the others aren't
51 for attr in 'name', 'mode', 'closed':
52 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
53
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000054 def testReadinto(self):
55 # verify readinto
Antoine Pitrou47a5f482009-06-12 20:41:52 +000056 self.f.write('12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000057 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000058 a = array('c', 'x'*10)
59 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000060 n = self.f.readinto(a)
Ezio Melotti2623a372010-11-21 13:34:58 +000061 self.assertEqual('12', a.tostring()[:n])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000062
63 def testWritelinesUserList(self):
64 # verify writelines with instance sequence
Antoine Pitrou47a5f482009-06-12 20:41:52 +000065 l = UserList(['1', '2'])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000066 self.f.writelines(l)
67 self.f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +000068 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000069 buf = self.f.read()
Ezio Melotti2623a372010-11-21 13:34:58 +000070 self.assertEqual(buf, '12')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000071
72 def testWritelinesIntegers(self):
73 # verify writelines with integers
74 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
75
76 def testWritelinesIntegersUserList(self):
77 # verify writelines with integers in UserList
78 l = UserList([1,2,3])
79 self.assertRaises(TypeError, self.f.writelines, l)
80
81 def testWritelinesNonString(self):
82 # verify writelines with non-string object
83 class NonString:
84 pass
85
86 self.assertRaises(TypeError, self.f.writelines,
87 [NonString(), NonString()])
88
Antoine Pitrou47a5f482009-06-12 20:41:52 +000089 def testRepr(self):
90 # verify repr works
Benjamin Peterson5c8da862009-06-30 22:57:08 +000091 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +000092
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000093 def testErrors(self):
Antoine Pitroubb445a12010-02-05 17:05:54 +000094 self.f.close()
95 self.f = open(TESTFN, 'rb')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +000096 f = self.f
Ezio Melotti2623a372010-11-21 13:34:58 +000097 self.assertEqual(f.name, TESTFN)
Benjamin Peterson5c8da862009-06-30 22:57:08 +000098 self.assertTrue(not f.isatty())
99 self.assertTrue(not f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000100
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000101 self.assertRaises(TypeError, f.readinto, "")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000102 f.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000103 self.assertTrue(f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000104
105 def testMethods(self):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000106 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
107 'readline', 'readlines', 'seek', 'tell', 'truncate',
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000108 'write', '__iter__']
109 deprecated_methods = ['xreadlines']
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000110 if sys.platform.startswith('atheos'):
111 methods.remove('truncate')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000112
113 # __exit__ should close the file
114 self.f.__exit__(None, None, None)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000115 self.assertTrue(self.f.closed)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000116
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000117 for methodname in methods:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000118 method = getattr(self.f, methodname)
119 # should raise on closed file
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000120 self.assertRaises(ValueError, method)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000121 with test_support.check_py3k_warnings():
122 for methodname in deprecated_methods:
123 method = getattr(self.f, methodname)
124 self.assertRaises(ValueError, method)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000125 self.assertRaises(ValueError, self.f.writelines, [])
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000126
127 # file is closed, __exit__ shouldn't do anything
Ezio Melotti2623a372010-11-21 13:34:58 +0000128 self.assertEqual(self.f.__exit__(None, None, None), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000129 # it must also return None if an exception was given
130 try:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000131 1 // 0
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000132 except:
Ezio Melotti2623a372010-11-21 13:34:58 +0000133 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000134
135 def testReadWhenWriting(self):
136 self.assertRaises(IOError, self.f.read)
137
Benjamin Petersonbf775542010-10-16 19:20:12 +0000138 def testNastyWritelinesGenerator(self):
139 def nasty():
140 for i in range(5):
141 if i == 3:
142 self.f.close()
143 yield str(i)
144 self.assertRaises(ValueError, self.f.writelines, nasty())
145
Antoine Pitroubb445a12010-02-05 17:05:54 +0000146 def testIssue5677(self):
147 # Remark: Do not perform more than one test per open file,
148 # since that does NOT catch the readline error on Windows.
149 data = 'xxx'
150 for mode in ['w', 'wb', 'a', 'ab']:
151 for attr in ['read', 'readline', 'readlines']:
152 self.f = open(TESTFN, mode)
153 self.f.write(data)
154 self.assertRaises(IOError, getattr(self.f, attr))
155 self.f.close()
156
157 self.f = open(TESTFN, mode)
158 self.f.write(data)
159 self.assertRaises(IOError, lambda: [line for line in self.f])
160 self.f.close()
161
162 self.f = open(TESTFN, mode)
163 self.f.write(data)
164 self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
165 self.f.close()
166
167 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
168 self.f = open(TESTFN, mode)
169 self.assertRaises(IOError, self.f.write, data)
170 self.f.close()
171
172 self.f = open(TESTFN, mode)
173 self.assertRaises(IOError, self.f.writelines, [data, data])
174 self.f.close()
175
176 self.f = open(TESTFN, mode)
177 self.assertRaises(IOError, self.f.truncate)
178 self.f.close()
179
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000180class OtherFileTests(unittest.TestCase):
181
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000182 def testOpenDir(self):
Hirokazu Yamamotofa647ec2010-09-23 15:59:21 +0000183 this_dir = os.path.dirname(__file__) or os.curdir
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000184 for mode in (None, "w"):
185 try:
186 if mode:
187 f = open(this_dir, mode)
188 else:
189 f = open(this_dir)
190 except IOError as e:
191 self.assertEqual(e.filename, this_dir)
192 else:
193 self.fail("opening a directory didn't raise an IOError")
194
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000195 def testModeStrings(self):
196 # check invalid mode strings
197 for mode in ("", "aU", "wU+"):
198 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000199 f = open(TESTFN, mode)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000200 except ValueError:
201 pass
202 else:
203 f.close()
204 self.fail('%r is an invalid file mode' % mode)
205
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000206 # Some invalid modes fail on Windows, but pass on Unix
207 # Issue3965: avoid a crash on Windows when filename is unicode
208 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
209 try:
210 f = open(name, "rr")
211 except (IOError, ValueError):
212 pass
213 else:
214 f.close()
215
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000216 def testStdin(self):
217 # This causes the interpreter to exit on OSF1 v5.1.
218 if sys.platform != 'osf1V5':
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000219 self.assertRaises(IOError, sys.stdin.seek, -1)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000220 else:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000221 print >>sys.__stdout__, (
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000222 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.'
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000223 ' Test manually.')
224 self.assertRaises(IOError, sys.stdin.truncate)
225
226 def testUnicodeOpen(self):
227 # verify repr works for unicode too
228 f = open(unicode(TESTFN), "w")
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000229 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000230 f.close()
231 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000232
233 def testBadModeArgument(self):
234 # verify that we get a sensible error message for bad mode argument
235 bad_mode = "qwerty"
236 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000237 f = open(TESTFN, bad_mode)
238 except ValueError, msg:
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000239 if msg.args[0] != 0:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000240 s = str(msg)
Ezio Melotti187f93d2010-03-17 14:22:34 +0000241 if TESTFN in s or bad_mode not in s:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000242 self.fail("bad error message for invalid mode: %s" % s)
Ezio Melottid80b4bf2010-03-17 13:52:48 +0000243 # if msg.args[0] == 0, we're probably on Windows where there may
244 # be no obvious way to discover why open() failed.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000245 else:
246 f.close()
247 self.fail("no error for invalid mode: %s" % bad_mode)
248
249 def testSetBufferSize(self):
250 # make sure that explicitly setting the buffer size doesn't cause
251 # misbehaviour especially with repeated close() calls
252 for s in (-1, 0, 1, 512):
253 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000254 f = open(TESTFN, 'w', s)
255 f.write(str(s))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000256 f.close()
257 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000258 f = open(TESTFN, 'r', s)
259 d = int(f.read())
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000260 f.close()
261 f.close()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000262 except IOError, msg:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000263 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
Ezio Melotti2623a372010-11-21 13:34:58 +0000264 self.assertEqual(d, s)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000265
266 def testTruncateOnWindows(self):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000267 os.unlink(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000268
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000269 def bug801631():
270 # SF bug <http://www.python.org/sf/801631>
271 # "file.truncate fault on windows"
272 f = open(TESTFN, 'wb')
273 f.write('12345678901') # 11 bytes
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000274 f.close()
275
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000276 f = open(TESTFN,'rb+')
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000277 data = f.read(5)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000278 if data != '12345':
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000279 self.fail("Read on file opened for update failed %r" % data)
280 if f.tell() != 5:
281 self.fail("File pos after read wrong %d" % f.tell())
282
283 f.truncate()
284 if f.tell() != 5:
285 self.fail("File pos after ftruncate wrong %d" % f.tell())
286
287 f.close()
288 size = os.path.getsize(TESTFN)
289 if size != 5:
290 self.fail("File size after ftruncate wrong %d" % size)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000291
292 try:
293 bug801631()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000294 finally:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000295 os.unlink(TESTFN)
296
297 def testIteration(self):
298 # Test the complex interaction when mixing file-iteration and the
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000299 # various read* methods. Ostensibly, the mixture could just be tested
300 # to work when it should work according to the Python language,
301 # instead of fail when it should fail according to the current CPython
302 # implementation. People don't always program Python the way they
303 # should, though, and the implemenation might change in subtle ways,
304 # so we explicitly test for errors, too; the test will just have to
305 # be updated when the implementation changes.
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000306 dataoffset = 16384
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000307 filler = "ham\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000308 assert not dataoffset % len(filler), \
309 "dataoffset must be multiple of len(filler)"
310 nchunks = dataoffset // len(filler)
311 testlines = [
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000312 "spam, spam and eggs\n",
313 "eggs, spam, ham and spam\n",
314 "saussages, spam, spam and eggs\n",
315 "spam, ham, spam and eggs\n",
316 "spam, spam, spam, spam, spam, ham, spam\n",
317 "wonderful spaaaaaam.\n"
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000318 ]
319 methods = [("readline", ()), ("read", ()), ("readlines", ()),
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000320 ("readinto", (array("c", " "*100),))]
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000321
322 try:
323 # Prepare the testfile
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000324 bag = open(TESTFN, "w")
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000325 bag.write(filler * nchunks)
326 bag.writelines(testlines)
327 bag.close()
328 # Test for appropriate errors mixing read* and iteration
329 for methodname, args in methods:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000330 f = open(TESTFN)
331 if f.next() != filler:
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000332 self.fail, "Broken testfile"
333 meth = getattr(f, methodname)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000334 try:
335 meth(*args)
336 except ValueError:
337 pass
338 else:
339 self.fail("%s%r after next() didn't raise ValueError" %
340 (methodname, args))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000341 f.close()
342
343 # Test to see if harmless (by accident) mixing of read* and
344 # iteration still works. This depends on the size of the internal
345 # iteration buffer (currently 8192,) but we can test it in a
346 # flexible manner. Each line in the bag o' ham is 4 bytes
347 # ("h", "a", "m", "\n"), so 4096 lines of that should get us
348 # exactly on the buffer boundary for any power-of-2 buffersize
349 # between 4 and 16384 (inclusive).
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000350 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000351 for i in range(nchunks):
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000352 f.next()
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000353 testline = testlines.pop(0)
354 try:
355 line = f.readline()
356 except ValueError:
357 self.fail("readline() after next() with supposedly empty "
358 "iteration-buffer failed anyway")
359 if line != testline:
360 self.fail("readline() after next() with empty buffer "
361 "failed. Got %r, expected %r" % (line, testline))
362 testline = testlines.pop(0)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000363 buf = array("c", "\x00" * len(testline))
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000364 try:
365 f.readinto(buf)
366 except ValueError:
367 self.fail("readinto() after next() with supposedly empty "
368 "iteration-buffer failed anyway")
369 line = buf.tostring()
370 if line != testline:
371 self.fail("readinto() after next() with empty buffer "
372 "failed. Got %r, expected %r" % (line, testline))
373
374 testline = testlines.pop(0)
375 try:
376 line = f.read(len(testline))
377 except ValueError:
378 self.fail("read() after next() with supposedly empty "
379 "iteration-buffer failed anyway")
380 if line != testline:
381 self.fail("read() after next() with empty buffer "
382 "failed. Got %r, expected %r" % (line, testline))
383 try:
384 lines = f.readlines()
385 except ValueError:
386 self.fail("readlines() after next() with supposedly empty "
387 "iteration-buffer failed anyway")
388 if lines != testlines:
389 self.fail("readlines() after next() with empty buffer "
390 "failed. Got %r, expected %r" % (line, testline))
391 # Reading after iteration hit EOF shouldn't hurt either
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000392 f = open(TESTFN)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000393 try:
394 for line in f:
395 pass
396 try:
397 f.readline()
398 f.readinto(buf)
399 f.read()
400 f.readlines()
401 except ValueError:
402 self.fail("read* failed after next() consumed file")
403 finally:
404 f.close()
405 finally:
406 os.unlink(TESTFN)
407
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000408class FileSubclassTests(unittest.TestCase):
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000409
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000410 def testExit(self):
411 # test that exiting with context calls subclass' close
412 class C(file):
413 def __init__(self, *args):
414 self.subclass_closed = False
415 file.__init__(self, *args)
416 def close(self):
417 self.subclass_closed = True
418 file.close(self)
419
420 with C(TESTFN, 'w') as f:
421 pass
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000422 self.assertTrue(f.subclass_closed)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000423
424
Victor Stinner6a102812010-04-27 23:55:59 +0000425@unittest.skipUnless(threading, 'Threading required for this test.')
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000426class FileThreadingTests(unittest.TestCase):
427 # These tests check the ability to call various methods of file objects
428 # (including close()) concurrently without crashing the Python interpreter.
429 # See #815646, #595601
430
431 def setUp(self):
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000432 self._threads = test_support.threading_setup()
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000433 self.f = None
434 self.filename = TESTFN
435 with open(self.filename, "w") as f:
436 f.write("\n".join("0123456789"))
437 self._count_lock = threading.Lock()
438 self.close_count = 0
439 self.close_success_count = 0
Antoine Pitrou83137c22010-05-17 19:56:59 +0000440 self.use_buffering = False
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000441
442 def tearDown(self):
443 if self.f:
444 try:
445 self.f.close()
446 except (EnvironmentError, ValueError):
447 pass
448 try:
449 os.remove(self.filename)
450 except EnvironmentError:
451 pass
Antoine Pitrou0df2c732009-10-27 19:36:44 +0000452 test_support.threading_cleanup(*self._threads)
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000453
454 def _create_file(self):
Antoine Pitrou83137c22010-05-17 19:56:59 +0000455 if self.use_buffering:
456 self.f = open(self.filename, "w+", buffering=1024*16)
457 else:
458 self.f = open(self.filename, "w+")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000459
460 def _close_file(self):
461 with self._count_lock:
462 self.close_count += 1
463 self.f.close()
464 with self._count_lock:
465 self.close_success_count += 1
466
467 def _close_and_reopen_file(self):
468 self._close_file()
469 # if close raises an exception thats fine, self.f remains valid so
470 # we don't need to reopen.
471 self._create_file()
472
473 def _run_workers(self, func, nb_workers, duration=0.2):
474 with self._count_lock:
475 self.close_count = 0
476 self.close_success_count = 0
477 self.do_continue = True
478 threads = []
479 try:
480 for i in range(nb_workers):
481 t = threading.Thread(target=func)
482 t.start()
483 threads.append(t)
484 for _ in xrange(100):
485 time.sleep(duration/100)
486 with self._count_lock:
487 if self.close_count-self.close_success_count > nb_workers+1:
488 if test_support.verbose:
489 print 'Q',
490 break
491 time.sleep(duration)
492 finally:
493 self.do_continue = False
494 for t in threads:
495 t.join()
496
497 def _test_close_open_io(self, io_func, nb_workers=5):
498 def worker():
499 self._create_file()
500 funcs = itertools.cycle((
501 lambda: io_func(),
502 lambda: self._close_and_reopen_file(),
503 ))
504 for f in funcs:
505 if not self.do_continue:
506 break
507 try:
508 f()
509 except (IOError, ValueError):
510 pass
511 self._run_workers(worker, nb_workers)
512 if test_support.verbose:
513 # Useful verbose statistics when tuning this test to take
514 # less time to run but still ensuring that its still useful.
515 #
516 # the percent of close calls that raised an error
517 percent = 100. - 100.*self.close_success_count/self.close_count
518 print self.close_count, ('%.4f ' % percent),
519
520 def test_close_open(self):
521 def io_func():
522 pass
523 self._test_close_open_io(io_func)
524
525 def test_close_open_flush(self):
526 def io_func():
527 self.f.flush()
528 self._test_close_open_io(io_func)
529
530 def test_close_open_iter(self):
531 def io_func():
532 list(iter(self.f))
533 self._test_close_open_io(io_func)
534
535 def test_close_open_isatty(self):
536 def io_func():
537 self.f.isatty()
538 self._test_close_open_io(io_func)
539
540 def test_close_open_print(self):
541 def io_func():
542 print >> self.f, ''
543 self._test_close_open_io(io_func)
544
Antoine Pitrou83137c22010-05-17 19:56:59 +0000545 def test_close_open_print_buffered(self):
546 self.use_buffering = True
547 def io_func():
548 print >> self.f, ''
549 self._test_close_open_io(io_func)
550
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000551 def test_close_open_read(self):
552 def io_func():
553 self.f.read(0)
554 self._test_close_open_io(io_func)
555
556 def test_close_open_readinto(self):
557 def io_func():
558 a = array('c', 'xxxxx')
559 self.f.readinto(a)
560 self._test_close_open_io(io_func)
561
562 def test_close_open_readline(self):
563 def io_func():
564 self.f.readline()
565 self._test_close_open_io(io_func)
566
567 def test_close_open_readlines(self):
568 def io_func():
569 self.f.readlines()
570 self._test_close_open_io(io_func)
571
572 def test_close_open_seek(self):
573 def io_func():
574 self.f.seek(0, 0)
575 self._test_close_open_io(io_func)
576
577 def test_close_open_tell(self):
578 def io_func():
579 self.f.tell()
580 self._test_close_open_io(io_func)
581
582 def test_close_open_truncate(self):
583 def io_func():
584 self.f.truncate()
585 self._test_close_open_io(io_func)
586
587 def test_close_open_write(self):
588 def io_func():
589 self.f.write('')
590 self._test_close_open_io(io_func)
591
592 def test_close_open_writelines(self):
593 def io_func():
594 self.f.writelines('')
595 self._test_close_open_io(io_func)
596
597
598class StdoutTests(unittest.TestCase):
599
600 def test_move_stdout_on_write(self):
601 # Issue 3242: sys.stdout can be replaced (and freed) during a
602 # print statement; prevent a segfault in this case
603 save_stdout = sys.stdout
604
605 class File:
606 def write(self, data):
607 if '\n' in data:
608 sys.stdout = save_stdout
609
610 try:
611 sys.stdout = File()
612 print "some text"
613 finally:
614 sys.stdout = save_stdout
615
616 def test_del_stdout_before_print(self):
617 # Issue 4597: 'print' with no argument wasn't reporting when
618 # sys.stdout was deleted.
619 save_stdout = sys.stdout
620 del sys.stdout
621 try:
622 print
623 except RuntimeError as e:
Ezio Melotti2623a372010-11-21 13:34:58 +0000624 self.assertEqual(str(e), "lost sys.stdout")
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000625 else:
626 self.fail("Expected RuntimeError")
627 finally:
628 sys.stdout = save_stdout
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000629
Victor Stinnercaafd772010-09-08 10:51:01 +0000630 def test_unicode(self):
631 import subprocess
632
633 def get_message(encoding, *code):
634 code = '\n'.join(code)
635 env = os.environ.copy()
636 env['PYTHONIOENCODING'] = encoding
637 process = subprocess.Popen([sys.executable, "-c", code],
638 stdout=subprocess.PIPE, env=env)
639 stdout, stderr = process.communicate()
640 self.assertEqual(process.returncode, 0)
641 return stdout
642
643 def check_message(text, encoding, expected):
644 stdout = get_message(encoding,
645 "import sys",
646 "sys.stdout.write(%r)" % text,
647 "sys.stdout.flush()")
648 self.assertEqual(stdout, expected)
649
Victor Stinner3a68f912010-09-08 11:45:16 +0000650 # test the encoding
651 check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
652 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
653 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
Victor Stinnercaafd772010-09-08 10:51:01 +0000654
Victor Stinner3a68f912010-09-08 11:45:16 +0000655 # test the error handler
656 check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
657 check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
658 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
659
660 # test the buffer API
Victor Stinnercaafd772010-09-08 10:51:01 +0000661 for objtype in ('buffer', 'bytearray'):
662 stdout = get_message('ascii',
663 'import sys',
Victor Stinner3a68f912010-09-08 11:45:16 +0000664 r'sys.stdout.write(%s("\xe9"))' % objtype,
665 'sys.stdout.flush()')
666 self.assertEqual(stdout, "\xe9")
Victor Stinnercaafd772010-09-08 10:51:01 +0000667
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000668
669def test_main():
670 # Historically, these tests have been sloppy about removing TESTFN.
671 # So get rid of it no matter what.
672 try:
Antoine Pitrou47a5f482009-06-12 20:41:52 +0000673 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
674 FileThreadingTests, StdoutTests)
Antoine Pitrouc5d2b412009-06-12 20:36:25 +0000675 finally:
676 if os.path.exists(TESTFN):
677 os.unlink(TESTFN)
678
679if __name__ == '__main__':
680 test_main()